/*****************************************************************************

Copyright (c) 1996, 2016, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2016, 2023, MariaDB Corporation.

This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
Foundation; version 2 of the License.

This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along with
this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA

*****************************************************************************/

/**************************************************//**
@file row/row0ins.cc
Insert into a table

Created 4/20/1996 Heikki Tuuri
*******************************************************/

#include "row0ins.h"
#include "dict0dict.h"
#include "trx0rec.h"
#include "trx0undo.h"
#include "btr0btr.h"
#include "btr0cur.h"
#include "mach0data.h"
#include "que0que.h"
#include "row0upd.h"
#include "row0sel.h"
#include "rem0cmp.h"
#include "lock0lock.h"
#include "log0log.h"
#include "eval0eval.h"
#include "data0data.h"
#include "buf0lru.h"
#include "fts0fts.h"
#include "fts0types.h"
#ifdef BTR_CUR_HASH_ADAPT
# include "btr0sea.h"
#endif
#ifdef WITH_WSREP
#include <wsrep.h>
#include <mysql/service_wsrep.h>
#include "ha_prototypes.h"
#endif /* WITH_WSREP */

/*************************************************************************
IMPORTANT NOTE: Any operation that generates redo MUST check that there
is enough space in the redo log before for that operation. This is
done by calling log_free_check(). The reason for checking the
availability of the redo log space before the start of the operation is
that we MUST not hold any synchonization objects when performing the
check.
If you make a change in this module make sure that no codepath is
introduced where a call to log_free_check() is bypassed. */

/** Create an row template for each index of a table. */
static void ins_node_create_entry_list(ins_node_t *node)
{
  node->entry_list.reserve(UT_LIST_GET_LEN(node->table->indexes));

  for (dict_index_t *index= dict_table_get_first_index(node->table); index;
       index= dict_table_get_next_index(index))
  {
    /* Corrupted or incomplete secondary indexes will be filtered out in
    row_ins(). */
    dtuple_t *entry= index->online_status >= ONLINE_INDEX_ABORTED
      ? dtuple_create(node->entry_sys_heap, 0)
      : row_build_index_entry_low(node->row, NULL, index, node->entry_sys_heap,
				  ROW_BUILD_FOR_INSERT);
    node->entry_list.push_back(entry);
  }
}

/*****************************************************************//**
Adds system field buffers to a row. */
static
void
row_ins_alloc_sys_fields(
/*=====================*/
	ins_node_t*	node)	/*!< in: insert node */
{
	dtuple_t*		row;
	dict_table_t*		table;
	const dict_col_t*	col;
	dfield_t*		dfield;

	row = node->row;
	table = node->table;

	ut_ad(dtuple_get_n_fields(row) == dict_table_get_n_cols(table));

	/* allocate buffer to hold the needed system created hidden columns. */
	compile_time_assert(DATA_ROW_ID_LEN
			    + DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN
			    == sizeof node->sys_buf);
	memset(node->sys_buf, 0, sizeof node->sys_buf);
	/* Assign DB_ROLL_PTR to 1 << ROLL_PTR_INSERT_FLAG_POS */
	node->sys_buf[DATA_ROW_ID_LEN + DATA_TRX_ID_LEN] = 0x80;
	ut_ad(!memcmp(node->sys_buf + DATA_ROW_ID_LEN, reset_trx_id,
		      sizeof reset_trx_id));

	/* 1. Populate row-id */
	col = dict_table_get_sys_col(table, DATA_ROW_ID);

	dfield = dtuple_get_nth_field(row, dict_col_get_no(col));

	dfield_set_data(dfield, node->sys_buf, DATA_ROW_ID_LEN);

	/* 2. Populate trx id */
	col = dict_table_get_sys_col(table, DATA_TRX_ID);

	dfield = dtuple_get_nth_field(row, dict_col_get_no(col));

	dfield_set_data(dfield, &node->sys_buf[DATA_ROW_ID_LEN],
			DATA_TRX_ID_LEN);

	col = dict_table_get_sys_col(table, DATA_ROLL_PTR);

	dfield = dtuple_get_nth_field(row, dict_col_get_no(col));

	dfield_set_data(dfield, &node->sys_buf[DATA_ROW_ID_LEN
					       + DATA_TRX_ID_LEN],
			DATA_ROLL_PTR_LEN);
}

/*********************************************************************//**
Sets a new row to insert for an INS_DIRECT node. This function is only used
if we have constructed the row separately, which is a rare case; this
function is quite slow. */
void
ins_node_set_new_row(
/*=================*/
	ins_node_t*	node,	/*!< in: insert node */
	dtuple_t*	row)	/*!< in: new row (or first row) for the node */
{
	node->state = INS_NODE_SET_IX_LOCK;
	node->index = NULL;
	node->entry_list.clear();
	node->entry = node->entry_list.end();

	node->row = row;

	mem_heap_empty(node->entry_sys_heap);

	/* Create templates for index entries */

	ins_node_create_entry_list(node);

	/* Allocate from entry_sys_heap buffers for sys fields */

	row_ins_alloc_sys_fields(node);

	/* As we allocated a new trx id buf, the trx id should be written
	there again: */

	node->trx_id = 0;
}

/*******************************************************************//**
Does an insert operation by updating a delete-marked existing record
in the index. This situation can occur if the delete-marked record is
kept in the index for consistent reads.
@return DB_SUCCESS or error code */
static MY_ATTRIBUTE((nonnull, warn_unused_result))
dberr_t
row_ins_sec_index_entry_by_modify(
/*==============================*/
	ulint		flags,	/*!< in: undo logging and locking flags */
	ulint		mode,	/*!< in: BTR_MODIFY_LEAF or BTR_INSERT_TREE,
				depending on whether mtr holds just a leaf
				latch or also a tree latch */
	btr_cur_t*	cursor,	/*!< in: B-tree cursor */
	rec_offs**	offsets,/*!< in/out: offsets on cursor->page_cur.rec */
	mem_heap_t*	offsets_heap,
				/*!< in/out: memory heap that can be emptied */
	mem_heap_t*	heap,	/*!< in/out: memory heap */
	const dtuple_t*	entry,	/*!< in: index entry to insert */
	que_thr_t*	thr,	/*!< in: query thread */
	mtr_t*		mtr)	/*!< in: mtr; must be committed before
				latching any further pages */
{
	big_rec_t*	dummy_big_rec;
	upd_t*		update;
	rec_t*		rec;
	dberr_t		err;

	rec = btr_cur_get_rec(cursor);

	ut_ad(!cursor->index()->is_clust());
	ut_ad(rec_offs_validate(rec, cursor->index(), *offsets));
	ut_ad(!entry->info_bits);

	/* We know that in the alphabetical ordering, entry and rec are
	identified. But in their binary form there may be differences if
	there are char fields in them. Therefore we have to calculate the
	difference. */

	update = row_upd_build_sec_rec_difference_binary(
		rec, cursor->index(), *offsets, entry, heap);

	if (!rec_get_deleted_flag(rec, rec_offs_comp(*offsets))) {
		/* We should never insert in place of a record that
		has not been delete-marked. The only exception is when
		online CREATE INDEX copied the changes that we already
		made to the clustered index, and completed the
		secondary index creation before we got here. In this
		case, the change would already be there. The CREATE
		INDEX should be in wait_while_table_is_used() at least
		until this INSERT or UPDATE returns. After that point,
		set_committed(true) would be invoked in
		commit_inplace_alter_table(). */
		ut_a(update->n_fields == 0);
		ut_ad(!dict_index_is_online_ddl(cursor->index()));
		return cursor->index()->is_committed()
			? DB_CORRUPTION : DB_SUCCESS;
	}

	if (mode == BTR_MODIFY_LEAF) {
		/* Try an optimistic updating of the record, keeping changes
		within the page */

		/* TODO: pass only *offsets */
		err = btr_cur_optimistic_update(
			flags | BTR_KEEP_SYS_FLAG, cursor,
			offsets, &offsets_heap, update, 0, thr,
			thr_get_trx(thr)->id, mtr);
		switch (err) {
		case DB_OVERFLOW:
		case DB_UNDERFLOW:
		case DB_ZIP_OVERFLOW:
			err = DB_FAIL;
		default:
			break;
		}
	} else {
		ut_ad(mode == BTR_INSERT_TREE);
		if (buf_pool.running_out()) {

			return(DB_LOCK_TABLE_FULL);
		}

		err = btr_cur_pessimistic_update(
			flags | BTR_KEEP_SYS_FLAG, cursor,
			offsets, &offsets_heap,
			heap, &dummy_big_rec, update, 0,
			thr, thr_get_trx(thr)->id, mtr);
		ut_ad(!dummy_big_rec);
	}

	return(err);
}

/*******************************************************************//**
Does an insert operation by delete unmarking and updating a delete marked
existing record in the index. This situation can occur if the delete marked
record is kept in the index for consistent reads.
@return DB_SUCCESS, DB_FAIL, or error code */
static MY_ATTRIBUTE((nonnull, warn_unused_result))
dberr_t
row_ins_clust_index_entry_by_modify(
/*================================*/
	btr_pcur_t*	pcur,	/*!< in/out: a persistent cursor pointing
				to the clust_rec that is being modified. */
	ulint		flags,	/*!< in: undo logging and locking flags */
	ulint		mode,	/*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
				depending on whether mtr holds just a leaf
				latch or also a tree latch */
	rec_offs**	offsets,/*!< out: offsets on cursor->page_cur.rec */
	mem_heap_t**	offsets_heap,
				/*!< in/out: pointer to memory heap that can
				be emptied, or NULL */
	mem_heap_t*	heap,	/*!< in/out: memory heap */
	const dtuple_t*	entry,	/*!< in: index entry to insert */
	que_thr_t*	thr,	/*!< in: query thread */
	mtr_t*		mtr)	/*!< in: mtr; must be committed before
				latching any further pages */
{
	const rec_t*	rec;
	upd_t*		update;
	dberr_t		err = DB_SUCCESS;
	btr_cur_t*	cursor	= btr_pcur_get_btr_cur(pcur);
	TABLE*		mysql_table = NULL;
	ut_ad(cursor->index()->is_clust());

	rec = btr_cur_get_rec(cursor);

	ut_ad(rec_get_deleted_flag(rec,
				   cursor->index()->table->not_redundant()));
	/* In delete-marked records, DB_TRX_ID must
	always refer to an existing undo log record. */
	ut_ad(rec_get_trx_id(rec, cursor->index()));

	/* Build an update vector containing all the fields to be modified;
	NOTE that this vector may NOT contain system columns trx_id or
	roll_ptr */
	if (thr->prebuilt != NULL) {
		mysql_table = thr->prebuilt->m_mysql_table;
		ut_ad(thr->prebuilt->trx == thr_get_trx(thr));
	}

	update = row_upd_build_difference_binary(
		cursor->index(), entry, rec, NULL, true, true,
		thr_get_trx(thr), heap, mysql_table, &err);
	if (err != DB_SUCCESS) {
		return(err);
	}

	if (mode != BTR_MODIFY_TREE) {
		ut_ad(mode == BTR_MODIFY_LEAF
		      || mode == BTR_MODIFY_LEAF_ALREADY_LATCHED
		      || mode == BTR_MODIFY_ROOT_AND_LEAF
		      || mode == BTR_MODIFY_ROOT_AND_LEAF_ALREADY_LATCHED);

		/* Try optimistic updating of the record, keeping changes
		within the page */

		err = btr_cur_optimistic_update(
			flags, cursor, offsets, offsets_heap, update, 0, thr,
			thr_get_trx(thr)->id, mtr);
		switch (err) {
		case DB_OVERFLOW:
		case DB_UNDERFLOW:
		case DB_ZIP_OVERFLOW:
			err = DB_FAIL;
		default:
			break;
		}
	} else {
		if (buf_pool.running_out()) {
			return DB_LOCK_TABLE_FULL;
		}

		big_rec_t*	big_rec	= NULL;

		err = btr_cur_pessimistic_update(
			flags | BTR_KEEP_POS_FLAG,
			cursor, offsets, offsets_heap, heap,
			&big_rec, update, 0, thr, thr_get_trx(thr)->id, mtr);

		if (big_rec) {
			ut_a(err == DB_SUCCESS);

			DEBUG_SYNC_C("before_row_ins_upd_extern");
			err = btr_store_big_rec_extern_fields(
				pcur, *offsets, big_rec, mtr,
				BTR_STORE_INSERT_UPDATE);
			DEBUG_SYNC_C("after_row_ins_upd_extern");
			dtuple_big_rec_free(big_rec);
		}
	}

	return(err);
}

/*********************************************************************//**
Returns TRUE if in a cascaded update/delete an ancestor node of node
updates (not DELETE, but UPDATE) table.
@return TRUE if an ancestor updates table */
static
ibool
row_ins_cascade_ancestor_updates_table(
/*===================================*/
	que_node_t*	node,	/*!< in: node in a query graph */
	dict_table_t*	table)	/*!< in: table */
{
	que_node_t*	parent;

	for (parent = que_node_get_parent(node);
	     que_node_get_type(parent) == QUE_NODE_UPDATE;
	     parent = que_node_get_parent(parent)) {

		upd_node_t*	upd_node;

		upd_node = static_cast<upd_node_t*>(parent);

		if (upd_node->table == table && !upd_node->is_delete) {

			return(TRUE);
		}
	}

	return(FALSE);
}

/*********************************************************************//**
Returns the number of ancestor UPDATE or DELETE nodes of a
cascaded update/delete node.
@return number of ancestors */
static MY_ATTRIBUTE((nonnull, warn_unused_result))
ulint
row_ins_cascade_n_ancestors(
/*========================*/
	que_node_t*	node)	/*!< in: node in a query graph */
{
	que_node_t*	parent;
	ulint		n_ancestors = 0;

	for (parent = que_node_get_parent(node);
	     que_node_get_type(parent) == QUE_NODE_UPDATE;
	     parent = que_node_get_parent(parent)) {

		n_ancestors++;
	}

	return(n_ancestors);
}

/******************************************************************//**
Calculates the update vector node->cascade->update for a child table in
a cascaded update.
@return whether any FULLTEXT INDEX is affected */
static MY_ATTRIBUTE((nonnull, warn_unused_result))
bool
row_ins_cascade_calc_update_vec(
/*============================*/
	upd_node_t*	node,		/*!< in: update node of the parent
					table */
	dict_foreign_t*	foreign,	/*!< in: foreign key constraint whose
					type is != 0 */
	mem_heap_t*	heap,		/*!< in: memory heap to use as
					temporary storage */
	trx_t*		trx)		/*!< in: update transaction */
{
	upd_node_t*     cascade         = node->cascade_node;
	dict_table_t*	table		= foreign->foreign_table;
	dict_index_t*	index		= foreign->foreign_index;
	upd_t*		update;
	dict_table_t*	parent_table;
	dict_index_t*	parent_index;
	upd_t*		parent_update;
	ulint		n_fields_updated;
	ulint		parent_field_no;
	ulint		i;
	ulint		j;
	bool		doc_id_updated = false;
	unsigned	doc_id_pos = 0;
	doc_id_t	new_doc_id = FTS_NULL_DOC_ID;
	ulint		prefix_col;

	ut_a(cascade);
	ut_a(table);
	ut_a(index);

	/* Calculate the appropriate update vector which will set the fields
	in the child index record to the same value (possibly padded with
	spaces if the column is a fixed length CHAR or FIXBINARY column) as
	the referenced index record will get in the update. */

	parent_table = node->table;
	ut_a(parent_table == foreign->referenced_table);
	parent_index = foreign->referenced_index;
	parent_update = node->update;

	update = cascade->update;

	update->info_bits = 0;

	n_fields_updated = 0;

	bool affects_fulltext = foreign->affects_fulltext();

	if (table->fts) {
		doc_id_pos = dict_table_get_nth_col_pos(
			table, table->fts->doc_col, &prefix_col);
	}

	for (i = 0; i < foreign->n_fields; i++) {

		parent_field_no = dict_table_get_nth_col_pos(
			parent_table,
			dict_index_get_nth_col_no(parent_index, i),
			&prefix_col);

		for (j = 0; j < parent_update->n_fields; j++) {
			const upd_field_t*	parent_ufield
				= &parent_update->fields[j];

			if (parent_ufield->field_no == parent_field_no
			    && !(parent_ufield->new_val.type.prtype
				 & DATA_VIRTUAL)) {

				ulint			min_size;
				const dict_col_t*	col;
				ulint			ufield_len;
				upd_field_t*		ufield;

				col = dict_index_get_nth_col(index, i);

				/* A field in the parent index record is
				updated. Let us make the update vector
				field for the child table. */

				ufield = update->fields + n_fields_updated;

				ufield->field_no = static_cast<uint16_t>(
					dict_table_get_nth_col_pos(
						table, dict_col_get_no(col),
						&prefix_col));

				ufield->orig_len = 0;
				ufield->exp = NULL;

				ufield->new_val = parent_ufield->new_val;
				dfield_get_type(&ufield->new_val)->prtype |=
					col->prtype & DATA_VERSIONED;
				ufield_len = dfield_get_len(&ufield->new_val);

				/* Clear the "external storage" flag */
				dfield_set_len(&ufield->new_val, ufield_len);

				/* Do not allow a NOT NULL column to be
				updated as NULL */

				if (dfield_is_null(&ufield->new_val)
				    && (col->prtype & DATA_NOT_NULL)) {
					goto err_exit;
				}

				/* If the new value would not fit in the
				column, do not allow the update */

				if (!dfield_is_null(&ufield->new_val)
				    && dtype_get_at_most_n_mbchars(
					col->prtype,
					col->mbminlen, col->mbmaxlen,
					col->len,
					ufield_len,
					static_cast<char*>(
						dfield_get_data(
							&ufield->new_val)))
				    < ufield_len) {
					goto err_exit;
				}

				/* If the parent column type has a different
				length than the child column type, we may
				need to pad with spaces the new value of the
				child column */

				min_size = dict_col_get_min_size(col);

				/* Because UNIV_SQL_NULL (the marker
				of SQL NULL values) exceeds all possible
				values of min_size, the test below will
				not hold for SQL NULL columns. */

				if (min_size > ufield_len) {

					byte*	pad;
					ulint	pad_len;
					byte*	padded_data;
					ulint	mbminlen;

					padded_data = static_cast<byte*>(
						mem_heap_alloc(
							heap, min_size));

					pad = padded_data + ufield_len;
					pad_len = min_size - ufield_len;

					memcpy(padded_data,
					       dfield_get_data(&ufield
							       ->new_val),
					       ufield_len);

					mbminlen = dict_col_get_mbminlen(col);

					ut_ad(!(ufield_len % mbminlen));
					ut_ad(!(min_size % mbminlen));

					if (mbminlen == 1
					    && dtype_get_charset_coll(
						    col->prtype)
					    == DATA_MYSQL_BINARY_CHARSET_COLL) {
						/* Do not pad BINARY columns */
						goto err_exit;
					}

					row_mysql_pad_col(mbminlen,
							  pad, pad_len);
					dfield_set_data(&ufield->new_val,
							padded_data, min_size);
				}

				/* If Doc ID is updated, check whether the
				Doc ID is valid */
				if (table->fts
				    && ufield->field_no == doc_id_pos) {
					doc_id_t	n_doc_id;

					n_doc_id =
						table->fts->cache->next_doc_id;

					new_doc_id = fts_read_doc_id(
						static_cast<const byte*>(
							dfield_get_data(
							&ufield->new_val)));

					affects_fulltext = true;
					doc_id_updated = true;

					if (new_doc_id <= 0) {
						ib::error() << "FTS Doc ID"
							" must be larger than"
							" 0";
						goto err_exit;
					}

					if (new_doc_id < n_doc_id) {
						ib::error() << "FTS Doc ID"
							" must be larger than "
							<< n_doc_id - 1
							<< " for table "
							<< table->name;
						goto err_exit;
					}
				}

				n_fields_updated++;
			}
		}
	}

	if (affects_fulltext) {
		ut_ad(table->fts);

		if (DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_HAS_DOC_ID)) {
			doc_id_t	doc_id;
			doc_id_t*	next_doc_id;
			upd_field_t*	ufield;

			next_doc_id = static_cast<doc_id_t*>(mem_heap_alloc(
				heap, sizeof(doc_id_t)));

			ut_ad(!doc_id_updated);
			ufield = update->fields + n_fields_updated;
			fts_get_next_doc_id(table, next_doc_id);
			doc_id = fts_update_doc_id(table, ufield, next_doc_id);
			n_fields_updated++;
			fts_trx_add_op(trx, table, doc_id, FTS_INSERT, NULL);
		} else  {
			if (doc_id_updated) {
				ut_ad(new_doc_id);
				fts_trx_add_op(trx, table, new_doc_id,
					       FTS_INSERT, NULL);
			} else {
				ib::error() << "FTS Doc ID must be updated"
					" along with FTS indexed column for"
					" table " << table->name;
err_exit:
				n_fields_updated = ULINT_UNDEFINED;
			}
		}
	}

	update->n_fields = n_fields_updated;

	return affects_fulltext;
}

/*********************************************************************//**
Set detailed error message associated with foreign key errors for
the given transaction. */
static
void
row_ins_set_detailed(
/*=================*/
	trx_t*		trx,		/*!< in: transaction */
	dict_foreign_t*	foreign)	/*!< in: foreign key constraint */
{
	ut_ad(!srv_read_only_mode);

	mysql_mutex_lock(&srv_misc_tmpfile_mutex);
	rewind(srv_misc_tmpfile);

	if (os_file_set_eof(srv_misc_tmpfile)) {
		ut_print_name(srv_misc_tmpfile, trx,
			      foreign->foreign_table_name);
		std::string fk_str = dict_print_info_on_foreign_key_in_create_format(
			trx, foreign, FALSE);
		fputs(fk_str.c_str(), srv_misc_tmpfile);
		trx_set_detailed_error_from_file(trx, srv_misc_tmpfile);
	} else {
		trx_set_detailed_error(trx, "temp file operation failed");
	}

	mysql_mutex_unlock(&srv_misc_tmpfile_mutex);
}

/*********************************************************************//**
Acquires dict_foreign_err_mutex, rewinds dict_foreign_err_file
and displays information about the given transaction.
The caller must release dict_foreign_err_mutex. */
TRANSACTIONAL_TARGET
static
void
row_ins_foreign_trx_print(
/*======================*/
	trx_t*	trx)	/*!< in: transaction */
{
	ulint	n_rec_locks;
	ulint	n_trx_locks;
	ulint	heap_size;

	ut_ad(!srv_read_only_mode);

	{
		TMLockMutexGuard g{SRW_LOCK_CALL};
		n_rec_locks = trx->lock.n_rec_locks;
		n_trx_locks = UT_LIST_GET_LEN(trx->lock.trx_locks);
		heap_size = mem_heap_get_size(trx->lock.lock_heap);
	}

	mysql_mutex_lock(&dict_foreign_err_mutex);
	rewind(dict_foreign_err_file);
	ut_print_timestamp(dict_foreign_err_file);
	fputs(" Transaction:\n", dict_foreign_err_file);

	trx_print_low(dict_foreign_err_file, trx,
		      n_rec_locks, n_trx_locks, heap_size);

	mysql_mutex_assert_owner(&dict_foreign_err_mutex);
}

/*********************************************************************//**
Reports a foreign key error associated with an update or a delete of a
parent table index entry. */
static
void
row_ins_foreign_report_err(
/*=======================*/
	const char*	errstr,		/*!< in: error string from the viewpoint
					of the parent table */
	que_thr_t*	thr,		/*!< in: query thread whose run_node
					is an update node */
	dict_foreign_t*	foreign,	/*!< in: foreign key constraint */
	const rec_t*	rec,		/*!< in: a matching index record in the
					child table */
	const dtuple_t*	entry)		/*!< in: index entry in the parent
					table */
{
	std::string fk_str;

	if (srv_read_only_mode) {
		return;
	}

	FILE*	ef	= dict_foreign_err_file;
	trx_t*	trx	= thr_get_trx(thr);

	row_ins_set_detailed(trx, foreign);

	row_ins_foreign_trx_print(trx);

	fputs("Foreign key constraint fails for table ", ef);
	ut_print_name(ef, trx, foreign->foreign_table_name);
	fputs(":\n", ef);
	fk_str = dict_print_info_on_foreign_key_in_create_format(trx, foreign,
							TRUE);
	fputs(fk_str.c_str(), ef);
	putc('\n', ef);
	fputs(errstr, ef);
	fprintf(ef, " in parent table, in index %s",
		foreign->referenced_index->name());
	if (entry) {
		fputs(" tuple:\n", ef);
		dtuple_print(ef, entry);
	}
	fputs("\nBut in child table ", ef);
	ut_print_name(ef, trx, foreign->foreign_table_name);
	fprintf(ef, ", in index %s", foreign->foreign_index->name());
	if (rec) {
		fputs(", there is a record:\n", ef);
		rec_print(ef, rec, foreign->foreign_index);
	} else {
		fputs(", the record is not available\n", ef);
	}
	putc('\n', ef);

	mysql_mutex_unlock(&dict_foreign_err_mutex);
}

/*********************************************************************//**
Reports a foreign key error to dict_foreign_err_file when we are trying
to add an index entry to a child table. Note that the adding may be the result
of an update, too. */
static
void
row_ins_foreign_report_add_err(
/*===========================*/
	trx_t*		trx,		/*!< in: transaction */
	dict_foreign_t*	foreign,	/*!< in: foreign key constraint */
	const rec_t*	rec,		/*!< in: a record in the parent table:
					it does not match entry because we
					have an error! */
	const dtuple_t*	entry)		/*!< in: index entry to insert in the
					child table */
{
	std::string fk_str;

	if (srv_read_only_mode) {
		return;
	}

	FILE*	ef	= dict_foreign_err_file;

	row_ins_set_detailed(trx, foreign);

	row_ins_foreign_trx_print(trx);

	fputs("Foreign key constraint fails for table ", ef);
	ut_print_name(ef, trx, foreign->foreign_table_name);
	fputs(":\n", ef);
	fk_str = dict_print_info_on_foreign_key_in_create_format(trx, foreign,
							TRUE);
	fputs(fk_str.c_str(), ef);
	if (foreign->foreign_index) {
		fprintf(ef, " in parent table, in index %s",
			foreign->foreign_index->name());
	} else {
		fputs(" in parent table", ef);
	}
	if (entry) {
		fputs(" tuple:\n", ef);
		/* TODO: DB_TRX_ID and DB_ROLL_PTR may be uninitialized.
		It would be better to only display the user columns. */
		dtuple_print(ef, entry);
	}
	fputs("\nBut in parent table ", ef);
	ut_print_name(ef, trx, foreign->referenced_table_name);
	fprintf(ef, ", in index %s,\n"
		"the closest match we can find is record:\n",
		foreign->referenced_index->name());
	if (rec && page_rec_is_supremum(rec)) {
		/* If the cursor ended on a supremum record, it is better
		to report the previous record in the error message, so that
		the user gets a more descriptive error message. */
		rec = page_rec_get_prev_const(rec);
	}

	if (rec) {
		rec_print(ef, rec, foreign->referenced_index);
	}
	putc('\n', ef);

	mysql_mutex_unlock(&dict_foreign_err_mutex);
}

/*********************************************************************//**
Invalidate the query cache for the given table. */
static
void
row_ins_invalidate_query_cache(
/*===========================*/
	que_thr_t*	thr,		/*!< in: query thread whose run_node
					is an update node */
	const char*	name)		/*!< in: table name prefixed with
					database name and a '/' character */
{
	innobase_invalidate_query_cache(thr_get_trx(thr), name);
}

/** Fill virtual column information in cascade node for the child table.
@param[out]	cascade		child update node
@param[in]	rec		clustered rec of child table
@param[in]	index		clustered index of child table
@param[in]	node		parent update node
@param[in]	foreign		foreign key information
@return		error code. */
static
dberr_t
row_ins_foreign_fill_virtual(
	upd_node_t*		cascade,
	const rec_t*		rec,
	dict_index_t*		index,
	upd_node_t*		node,
	dict_foreign_t*		foreign)
{
	THD*		thd = current_thd;
	row_ext_t*	ext;
	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
	rec_offs_init(offsets_);
	const rec_offs*	offsets =
		rec_get_offsets(rec, index, offsets_, index->n_core_fields,
				ULINT_UNDEFINED, &cascade->heap);
	TABLE*		mysql_table= NULL;
	upd_t*		update = cascade->update;
	ulint		n_v_fld = index->table->n_v_def;
	ulint		n_diff;
	upd_field_t*	upd_field;
	dict_vcol_set*	v_cols = foreign->v_cols;
	update->old_vrow = row_build(
		ROW_COPY_DATA, index, rec,
		offsets, index->table, NULL, NULL,
		&ext, update->heap);
	n_diff = update->n_fields;

	ut_ad(index->table->vc_templ != NULL);

	ib_vcol_row vc(NULL);
	uchar *record = vc.record(thd, index, &mysql_table);
	if (!record) {
		return DB_OUT_OF_MEMORY;
	}
	ut_ad(!node->is_delete
	      || (foreign->type & foreign->DELETE_SET_NULL));
	ut_ad(foreign->type & (foreign->DELETE_SET_NULL
			       | foreign->UPDATE_SET_NULL
			       | foreign->UPDATE_CASCADE));

	for (uint16_t i = 0; i < n_v_fld; i++) {

		dict_v_col_t*     col = dict_table_get_nth_v_col(
				index->table, i);

		dict_vcol_set::iterator it = v_cols->find(col);

		if (it == v_cols->end()) {
			continue;
		}

		dfield_t*	vfield = innobase_get_computed_value(
				update->old_vrow, col, index,
				&vc.heap, update->heap, NULL, thd, mysql_table,
				record, NULL, NULL);

		if (vfield == NULL) {
			return DB_COMPUTE_VALUE_FAILED;
		}

		upd_field = update->fields + n_diff;

		upd_field->old_v_val = static_cast<dfield_t*>(
			mem_heap_alloc(update->heap,
				       sizeof *upd_field->old_v_val));

		dfield_copy(upd_field->old_v_val, vfield);

		upd_field_set_v_field_no(upd_field, i, index);

		dfield_t* new_vfield = innobase_get_computed_value(
				update->old_vrow, col, index,
				&vc.heap, update->heap, NULL, thd,
				mysql_table, record, NULL,
				update);

		if (new_vfield == NULL) {
			return DB_COMPUTE_VALUE_FAILED;
		}

		dfield_copy(&upd_field->new_val, new_vfield);

		if (!dfield_datas_are_binary_equal(
				upd_field->old_v_val,
				&upd_field->new_val, 0))
			n_diff++;
	}

	update->n_fields = n_diff;
	return DB_SUCCESS;
}

#ifdef WITH_WSREP
dberr_t wsrep_append_foreign_key(trx_t *trx,
			       dict_foreign_t*	foreign,
			       const rec_t*	clust_rec,
			       dict_index_t*	clust_index,
			       bool		referenced,
			       upd_node_t*	upd_node,
			       bool		pa_disable,
			       Wsrep_service_key_type	key_type);
#endif /* WITH_WSREP */

/*********************************************************************//**
Perform referential actions or checks when a parent row is deleted or updated
and the constraint had an ON DELETE or ON UPDATE condition which was not
RESTRICT.
@return DB_SUCCESS, DB_LOCK_WAIT, or error code */
static MY_ATTRIBUTE((nonnull, warn_unused_result))
dberr_t
row_ins_foreign_check_on_constraint(
/*================================*/
	que_thr_t*	thr,		/*!< in: query thread whose run_node
					is an update node */
	dict_foreign_t*	foreign,	/*!< in: foreign key constraint whose
					type is != 0 */
	btr_pcur_t*	pcur,		/*!< in: cursor placed on a matching
					index record in the child table */
	dtuple_t*	entry,		/*!< in: index entry in the parent
					table */
	mtr_t*		mtr)		/*!< in: mtr holding the latch of pcur
					page */
{
	upd_node_t*	node;
	upd_node_t*	cascade;
	dict_table_t*const*const fktable = &foreign->foreign_table;
	dict_table_t*	table = *fktable;
	dict_index_t*	index;
	dict_index_t*	clust_index;
	dtuple_t*	ref;
	const rec_t*	rec;
	const rec_t*	clust_rec;
	const buf_block_t* clust_block;
	upd_t*		update;
	dberr_t		err;
	trx_t*		trx;
	mem_heap_t*	tmp_heap	= NULL;
	doc_id_t	doc_id = FTS_NULL_DOC_ID;

	DBUG_ENTER("row_ins_foreign_check_on_constraint");

	trx = thr_get_trx(thr);

	/* Since we are going to delete or update a row, we have to invalidate
	the MySQL query cache for table. A deadlock of threads is not possible
	here because the caller of this function does not hold any latches with
	the mutex rank above the lock_sys.latch. The query cache mutex
	has a rank just above the lock_sys.latch. */

	row_ins_invalidate_query_cache(thr, table->name.m_name);

	node = static_cast<upd_node_t*>(thr->run_node);

	if (node->is_delete && 0 == (foreign->type
				     & (foreign->DELETE_CASCADE
					| foreign->DELETE_SET_NULL))) {

		row_ins_foreign_report_err("Trying to delete",
					   thr, foreign,
					   btr_pcur_get_rec(pcur), entry);

		DBUG_RETURN(DB_ROW_IS_REFERENCED);
	}

	if (!node->is_delete && 0 == (foreign->type
				      & (foreign->UPDATE_CASCADE
					 | foreign->UPDATE_SET_NULL))) {

		/* This is an UPDATE */

		row_ins_foreign_report_err("Trying to update",
					   thr, foreign,
					   btr_pcur_get_rec(pcur), entry);

		DBUG_RETURN(DB_ROW_IS_REFERENCED);
	}

	if (node->cascade_node == NULL) {
		node->cascade_heap = mem_heap_create(128);
		node->cascade_node = row_create_update_node_for_mysql(
			table, node->cascade_heap);
		que_node_set_parent(node->cascade_node, node);

	}
	cascade = node->cascade_node;
	cascade->table = table;
	cascade->foreign = foreign;

	if (node->is_delete
	    && (foreign->type & foreign->DELETE_CASCADE)) {
		cascade->is_delete = PLAIN_DELETE;
	} else {
		cascade->is_delete = NO_DELETE;

		if (foreign->n_fields > cascade->update_n_fields) {
			/* We have to make the update vector longer */

			cascade->update = upd_create(foreign->n_fields,
						     node->cascade_heap);
			cascade->update_n_fields = foreign->n_fields;
		}

		/* We do not allow cyclic cascaded updating (DELETE is
		allowed, but not UPDATE) of the same table, as this
		can lead to an infinite cycle. Check that we are not
		updating the same table which is already being
		modified in this cascade chain. We have to check this
		also because the modification of the indexes of a
		'parent' table may still be incomplete, and we must
		avoid seeing the indexes of the parent table in an
		inconsistent state! */

		if (row_ins_cascade_ancestor_updates_table(cascade, table)) {

			/* We do not know if this would break foreign key
			constraints, but play safe and return an error */

			err = DB_ROW_IS_REFERENCED;

			row_ins_foreign_report_err(
				"Trying an update, possibly causing a cyclic"
				" cascaded update\n"
				"in the child table,", thr, foreign,
				btr_pcur_get_rec(pcur), entry);

			goto nonstandard_exit_func;
		}
	}

	if (row_ins_cascade_n_ancestors(cascade) >= FK_MAX_CASCADE_DEL) {
		err = DB_FOREIGN_EXCEED_MAX_CASCADE;

		row_ins_foreign_report_err(
			"Trying a too deep cascaded delete or update\n",
			thr, foreign, btr_pcur_get_rec(pcur), entry);

		goto nonstandard_exit_func;
	}

	index = pcur->index();

	ut_a(index == foreign->foreign_index);

	rec = btr_pcur_get_rec(pcur);

	tmp_heap = mem_heap_create(256);

	if (dict_index_is_clust(index)) {
		/* pcur is already positioned in the clustered index of
		the child table */

		clust_index = index;
		clust_rec = rec;
		clust_block = btr_pcur_get_block(pcur);
	} else {
		/* We have to look for the record in the clustered index
		in the child table */

		clust_index = dict_table_get_first_index(table);

		ref = row_build_row_ref(ROW_COPY_POINTERS, index, rec,
					tmp_heap);
		cascade->pcur->old_rec = nullptr;
		cascade->pcur->btr_cur.page_cur.index = clust_index;
		err = btr_pcur_open_with_no_init(ref,
						 PAGE_CUR_LE, BTR_SEARCH_LEAF,
						 cascade->pcur, mtr);
		if (UNIV_UNLIKELY(err != DB_SUCCESS)) {
			goto nonstandard_exit_func;
		}

		clust_rec = btr_pcur_get_rec(cascade->pcur);
		clust_block = btr_pcur_get_block(cascade->pcur);

		if (!page_rec_is_user_rec(clust_rec)
		    || btr_pcur_get_low_match(cascade->pcur)
		    < dict_index_get_n_unique(clust_index)) {

			ib::error() << "In cascade of a foreign key op index "
				<< index->name
				<< " of table " << index->table->name;

			fputs("InnoDB: record ", stderr);
			rec_print(stderr, rec, index);
			fputs("\n"
			      "InnoDB: clustered record ", stderr);
			rec_print(stderr, clust_rec, clust_index);
			fputs("\n"
			      "InnoDB: Submit a detailed bug report to"
			      " https://jira.mariadb.org/\n", stderr);
			ut_ad(0);
			err = DB_SUCCESS;

			goto nonstandard_exit_func;
		}
	}

	/* Set an X-lock on the row to delete or update in the child table */

	err = lock_table(table, fktable, LOCK_IX, thr);

	if (err == DB_SUCCESS) {
		/* Here it suffices to use a LOCK_REC_NOT_GAP type lock;
		we already have a normal shared lock on the appropriate
		gap if the search criterion was not unique */

		err = lock_clust_rec_read_check_and_lock_alt(
			0, clust_block, clust_rec, clust_index,
			LOCK_X, LOCK_REC_NOT_GAP, thr);
	}

	if (err != DB_SUCCESS) {

		goto nonstandard_exit_func;
	}

	if (rec_get_deleted_flag(clust_rec, dict_table_is_comp(table))) {
		/* In delete-marked records, DB_TRX_ID must
		always refer to an existing undo log record. */
		ut_ad(rec_get_trx_id(clust_rec, clust_index));
		/* This can happen if there is a circular reference of
		rows such that cascading delete comes to delete a row
		already in the process of being delete marked */
		err = DB_SUCCESS;

		goto nonstandard_exit_func;
	}

	if (table->fts) {
		doc_id = fts_get_doc_id_from_rec(
			clust_rec, clust_index,
			rec_get_offsets(clust_rec, clust_index, NULL,
					clust_index->n_core_fields,
					ULINT_UNDEFINED, &tmp_heap));
	}

	if (node->is_delete
	    ? (foreign->type & foreign->DELETE_SET_NULL)
	    : (foreign->type & foreign->UPDATE_SET_NULL)) {
		/* Build the appropriate update vector which sets
		foreign->n_fields first fields in rec to SQL NULL */

		update = cascade->update;

		update->info_bits = 0;
		update->n_fields = foreign->n_fields;
		MEM_UNDEFINED(update->fields,
			      update->n_fields * sizeof *update->fields);

		for (ulint i = 0; i < foreign->n_fields; i++) {
			upd_field_t*	ufield = &update->fields[i];
			ulint		col_no = dict_index_get_nth_col_no(
						index, i);
			ulint		prefix_col;

			ufield->field_no = static_cast<uint16_t>(
				dict_table_get_nth_col_pos(
					table, col_no, &prefix_col));
			dict_col_t*	col = dict_table_get_nth_col(
				table, col_no);
			dict_col_copy_type(col, dfield_get_type(&ufield->new_val));

			ufield->orig_len = 0;
			ufield->exp = NULL;
			dfield_set_null(&ufield->new_val);
		}

		if (foreign->affects_fulltext()) {
			fts_trx_add_op(trx, table, doc_id, FTS_DELETE, NULL);
		}

		if (foreign->v_cols != NULL
		    && foreign->v_cols->size() > 0) {
			err = row_ins_foreign_fill_virtual(
				cascade, clust_rec, clust_index,
				node, foreign);

			if (err != DB_SUCCESS) {
				goto nonstandard_exit_func;
			}
		}
	} else if (table->fts && cascade->is_delete == PLAIN_DELETE
		   && foreign->affects_fulltext()) {
		/* dict_foreign_t::DELETE_CASCADE case */
		fts_trx_add_op(trx, table, doc_id, FTS_DELETE, NULL);
	}

	if (!node->is_delete
	    && (foreign->type & foreign->UPDATE_CASCADE)) {

		/* Build the appropriate update vector which sets changing
		foreign->n_fields first fields in rec to new values */

		bool affects_fulltext = row_ins_cascade_calc_update_vec(
			node, foreign, tmp_heap, trx);

		if (foreign->v_cols && !foreign->v_cols->empty()) {
			err = row_ins_foreign_fill_virtual(
				cascade, clust_rec, clust_index,
				node, foreign);

			if (err != DB_SUCCESS) {
				goto nonstandard_exit_func;
			}
		}

		switch (cascade->update->n_fields) {
		case ULINT_UNDEFINED:
			err = DB_ROW_IS_REFERENCED;

			row_ins_foreign_report_err(
				"Trying a cascaded update where the"
				" updated value in the child\n"
				"table would not fit in the length"
				" of the column, or the value would\n"
				"be NULL and the column is"
				" declared as not NULL in the child table,",
				thr, foreign, btr_pcur_get_rec(pcur), entry);

			goto nonstandard_exit_func;
		case 0:
			/* The update does not change any columns referred
			to in this foreign key constraint: no need to do
			anything */

			err = DB_SUCCESS;

			goto nonstandard_exit_func;
		}

		/* Mark the old Doc ID as deleted */
		if (affects_fulltext) {
			ut_ad(table->fts);
			fts_trx_add_op(trx, table, doc_id, FTS_DELETE, NULL);
		}
	}

	if (table->versioned() && cascade->is_delete != PLAIN_DELETE
	    && cascade->update->affects_versioned()) {
		ut_ad(!cascade->historical_heap);
		cascade->historical_heap = mem_heap_create(srv_page_size);
		cascade->historical_row = row_build(
			ROW_COPY_DATA, clust_index, clust_rec, NULL, table,
			NULL, NULL, NULL, cascade->historical_heap);
	}

	/* Store pcur position and initialize or store the cascade node
	pcur stored position */

	btr_pcur_store_position(pcur, mtr);

	if (index == clust_index) {
		btr_pcur_copy_stored_position(cascade->pcur, pcur);
	} else {
		btr_pcur_store_position(cascade->pcur, mtr);
	}

#ifdef WITH_WSREP
	if (trx->is_wsrep()) {
		err = wsrep_append_foreign_key(trx, foreign, clust_rec, clust_index,
					       false, NULL, true,
					       WSREP_SERVICE_KEY_EXCLUSIVE);
		if (err != DB_SUCCESS) {
			goto nonstandard_exit_func;
		}
	}
#endif /* WITH_WSREP */
	mtr_commit(mtr);

	ut_a(cascade->pcur->rel_pos == BTR_PCUR_ON);

	cascade->state = UPD_NODE_UPDATE_CLUSTERED;

	err = row_update_cascade_for_mysql(thr, cascade,
					   foreign->foreign_table);

	mtr_start(mtr);

	/* Restore pcur position */

	if (pcur->restore_position(BTR_SEARCH_LEAF, mtr)
	    != btr_pcur_t::SAME_ALL) {
		err = DB_CORRUPTION;
	}

	if (tmp_heap) {
		mem_heap_free(tmp_heap);
	}

	DBUG_RETURN(err);

nonstandard_exit_func:

	if (tmp_heap) {
		mem_heap_free(tmp_heap);
	}

	btr_pcur_store_position(pcur, mtr);

	mtr_commit(mtr);
	mtr_start(mtr);

	if (pcur->restore_position(BTR_SEARCH_LEAF, mtr)
	    != btr_pcur_t::SAME_ALL && err == DB_SUCCESS) {
		err = DB_CORRUPTION;
	}

	DBUG_RETURN(err);
}

/*********************************************************************//**
Sets a shared lock on a record. Used in locking possible duplicate key
records and also in checking foreign key constraints.
@return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, or error code */
static
dberr_t
row_ins_set_shared_rec_lock(
/*========================*/
	unsigned		type,	/*!< in: LOCK_ORDINARY, LOCK_GAP, or
					LOCK_REC_NOT_GAP type lock */
	const buf_block_t*	block,	/*!< in: buffer block of rec */
	const rec_t*		rec,	/*!< in: record */
	dict_index_t*		index,	/*!< in: index */
	const rec_offs*		offsets,/*!< in: rec_get_offsets(rec, index) */
	que_thr_t*		thr)	/*!< in: query thread */
{
	dberr_t	err;

	ut_ad(rec_offs_validate(rec, index, offsets));

	if (dict_index_is_clust(index)) {
		err = lock_clust_rec_read_check_and_lock(
			0, block, rec, index, offsets, LOCK_S, type, thr);
	} else {
		err = lock_sec_rec_read_check_and_lock(
			0, block, rec, index, offsets, LOCK_S, type, thr);
	}

	return(err);
}

/*********************************************************************//**
Sets a exclusive lock on a record. Used in locking possible duplicate key
records
@return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, or error code */
static
dberr_t
row_ins_set_exclusive_rec_lock(
/*===========================*/
	unsigned		type,	/*!< in: LOCK_ORDINARY, LOCK_GAP, or
					LOCK_REC_NOT_GAP type lock */
	const buf_block_t*	block,	/*!< in: buffer block of rec */
	const rec_t*		rec,	/*!< in: record */
	dict_index_t*		index,	/*!< in: index */
	const rec_offs*		offsets,/*!< in: rec_get_offsets(rec, index) */
	que_thr_t*		thr)	/*!< in: query thread */
{
	dberr_t	err;

	ut_ad(rec_offs_validate(rec, index, offsets));

	if (dict_index_is_clust(index)) {
		err = lock_clust_rec_read_check_and_lock(
			0, block, rec, index, offsets, LOCK_X, type, thr);
	} else {
		err = lock_sec_rec_read_check_and_lock(
			0, block, rec, index, offsets, LOCK_X, type, thr);
	}

	return(err);
}

/***************************************************************//**
Checks if foreign key constraint fails for an index entry. Sets shared locks
which lock either the success or the failure of the constraint. NOTE that
the caller must have a shared latch on dict_sys.latch.
@return DB_SUCCESS, DB_NO_REFERENCED_ROW, or DB_ROW_IS_REFERENCED */
dberr_t
row_ins_check_foreign_constraint(
/*=============================*/
	ibool		check_ref,/*!< in: TRUE if we want to check that
				the referenced table is ok, FALSE if we
				want to check the foreign key table */
	dict_foreign_t*	foreign,/*!< in: foreign constraint; NOTE that the
				tables mentioned in it must be in the
				dictionary cache if they exist at all */
	dict_table_t*	table,	/*!< in: if check_ref is TRUE, then the foreign
				table, else the referenced table */
	dtuple_t*	entry,	/*!< in: index entry for index */
	que_thr_t*	thr)	/*!< in: query thread */
{
	upd_node_t*	upd_node;
	ulint		n_fields_cmp;
	btr_pcur_t	pcur;
	int		cmp;
	mtr_t		mtr;
	trx_t*		trx		= thr_get_trx(thr);
	mem_heap_t*	heap		= NULL;
	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
	rec_offs*	offsets		= offsets_;

	bool		skip_gap_lock;

	skip_gap_lock = (trx->isolation_level <= TRX_ISO_READ_COMMITTED);

	DBUG_ENTER("row_ins_check_foreign_constraint");

	rec_offs_init(offsets_);

#ifdef WITH_WSREP
	upd_node= NULL;
#endif /* WITH_WSREP */

	if (!trx->check_foreigns) {
		/* The user has suppressed foreign key checks currently for
		this session */
		DBUG_RETURN(DB_SUCCESS);
	}

	/* If any of the foreign key fields in entry is SQL NULL, we
	suppress the foreign key check: this is compatible with Oracle,
	for example */
	for (ulint i = 0; i < entry->n_fields; i++) {
		dfield_t* field = dtuple_get_nth_field(entry, i);
		if (i < foreign->n_fields && dfield_is_null(field)) {
			DBUG_RETURN(DB_SUCCESS);
		}
		/* System Versioning: if row_end != Inf, we
		suppress the foreign key check */
		if (field->type.vers_sys_end() && field->vers_history_row()) {
			DBUG_RETURN(DB_SUCCESS);
		}
	}

	if (que_node_get_type(thr->run_node) == QUE_NODE_UPDATE) {
		upd_node = static_cast<upd_node_t*>(thr->run_node);

		if (upd_node->is_delete != PLAIN_DELETE
		    && upd_node->foreign == foreign) {
			/* If a cascaded update is done as defined by a
			foreign key constraint, do not check that
			constraint for the child row. In ON UPDATE CASCADE
			the update of the parent row is only half done when
			we come here: if we would check the constraint here
			for the child row it would fail.

			A QUESTION remains: if in the child table there are
			several constraints which refer to the same parent
			table, we should merge all updates to the child as
			one update? And the updates can be contradictory!
			Currently we just perform the update associated
			with each foreign key constraint, one after
			another, and the user has problems predicting in
			which order they are performed. */

			DBUG_RETURN(DB_SUCCESS);
		}
	}

	if (que_node_get_type(thr->run_node) == QUE_NODE_INSERT) {
		ins_node_t* insert_node =
			static_cast<ins_node_t*>(thr->run_node);
		dict_table_t* table = insert_node->index->table;
		if (table->versioned()) {
			dfield_t* row_end = dtuple_get_nth_field(
				insert_node->row, table->vers_end);
			if (row_end->vers_history_row()) {
				DBUG_RETURN(DB_SUCCESS);
			}
		}
	}

	dict_table_t *check_table;
	dict_index_t *check_index;
	dberr_t err = DB_SUCCESS;

	{
		dict_table_t*& fktable = check_ref
			? foreign->referenced_table : foreign->foreign_table;
		check_table = fktable;
		if (check_table) {
			err = lock_table(check_table, &fktable, LOCK_IS, thr);
			if (err != DB_SUCCESS) {
				goto do_possible_lock_wait;
			}
		}
		check_table = fktable;
	}

	check_index = check_ref
		? foreign->referenced_index : foreign->foreign_index;

	if (!check_table || !check_table->is_readable() || !check_index) {
		FILE*	ef = dict_foreign_err_file;
		std::string fk_str;

		row_ins_set_detailed(trx, foreign);
		row_ins_foreign_trx_print(trx);

		fputs("Foreign key constraint fails for table ", ef);
		ut_print_name(ef, trx, check_ref
			      ? foreign->foreign_table_name
			      : foreign->referenced_table_name);
		fputs(":\n", ef);
		fk_str = dict_print_info_on_foreign_key_in_create_format(
			trx, foreign, TRUE);
		fputs(fk_str.c_str(), ef);
		if (check_ref) {
			if (foreign->foreign_index) {
				fprintf(ef, "\nTrying to add to index %s"
					" tuple:\n",
					foreign->foreign_index->name());
			} else {
				fputs("\nTrying to add tuple:\n", ef);
			}
			dtuple_print(ef, entry);
			fputs("\nBut the parent table ", ef);
			ut_print_name(ef, trx, foreign->referenced_table_name);
			fputs("\nor its .ibd file or the required index does"
			      " not currently exist!\n", ef);
			err = DB_NO_REFERENCED_ROW;
		} else {
			if (foreign->referenced_index) {
				fprintf(ef, "\nTrying to modify index %s"
					" tuple:\n",
					foreign->referenced_index->name());
			} else {
				fputs("\nTrying to modify tuple:\n", ef);
			}
			dtuple_print(ef, entry);
			fputs("\nBut the referencing table ", ef);
			ut_print_name(ef, trx, foreign->foreign_table_name);
			fputs("\nor its .ibd file or the required index does"
			      " not currently exist!\n", ef);
			err = DB_ROW_IS_REFERENCED;
		}

		mysql_mutex_unlock(&dict_foreign_err_mutex);
		goto exit_func;
	}

	mtr_start(&mtr);

	/* Store old value on n_fields_cmp */

	n_fields_cmp = dtuple_get_n_fields_cmp(entry);

	dtuple_set_n_fields_cmp(entry, foreign->n_fields);
	pcur.btr_cur.page_cur.index = check_index;
	err = btr_pcur_open(entry, PAGE_CUR_GE, BTR_SEARCH_LEAF, &pcur, &mtr);
	if (UNIV_UNLIKELY(err != DB_SUCCESS)) {
		goto end_scan;
	}

	/* Scan index records and check if there is a matching record */

	do {
		const rec_t*		rec = btr_pcur_get_rec(&pcur);
		const buf_block_t*	block = btr_pcur_get_block(&pcur);

		if (page_rec_is_infimum_low(rec - block->page.frame)) {

			continue;
		}

		offsets = rec_get_offsets(rec, check_index, offsets,
					  check_index->n_core_fields,
					  ULINT_UNDEFINED, &heap);

		if (page_rec_is_supremum_low(rec - block->page.frame)) {

			if (skip_gap_lock) {

				continue;
			}

			err = row_ins_set_shared_rec_lock(LOCK_ORDINARY, block,
							  rec, check_index,
							  offsets, thr);
			switch (err) {
			case DB_SUCCESS_LOCKED_REC:
			case DB_SUCCESS:
				continue;
			default:
				goto end_scan;
			}
		}

		cmp = cmp_dtuple_rec(entry, rec, check_index, offsets);

		if (cmp == 0) {
			if (rec_get_deleted_flag(rec,
						 rec_offs_comp(offsets))) {
				/* In delete-marked records, DB_TRX_ID must
				always refer to an existing undo log record. */
				ut_ad(!dict_index_is_clust(check_index)
				      || row_get_rec_trx_id(rec, check_index,
							    offsets));

				err = row_ins_set_shared_rec_lock(
					skip_gap_lock
					? LOCK_REC_NOT_GAP
					: LOCK_ORDINARY, block,
					rec, check_index, offsets, thr);
				switch (err) {
				case DB_SUCCESS_LOCKED_REC:
				case DB_SUCCESS:
					break;
				default:
					goto end_scan;
				}
			} else {
				if (check_table->versioned()) {
					bool history_row = false;

					if (check_index->is_primary()) {
						history_row = check_index->
							vers_history_row(rec,
									 offsets);
					} else if (check_index->
						vers_history_row(rec,
								 history_row)) {
						break;
					}

					if (history_row) {
						continue;
					}
				}
				/* Found a matching record. Lock only
				a record because we can allow inserts
				into gaps */

				err = row_ins_set_shared_rec_lock(
					LOCK_REC_NOT_GAP, block,
					rec, check_index, offsets, thr);

				switch (err) {
				case DB_SUCCESS_LOCKED_REC:
				case DB_SUCCESS:
					break;
				default:
					goto end_scan;
				}

				if (check_ref) {
					err = DB_SUCCESS;
#ifdef WITH_WSREP
					if (trx->is_wsrep()) {
						err = wsrep_append_foreign_key(
							thr_get_trx(thr),
							foreign,
							rec,
							check_index,
							check_ref,
							upd_node,
						        false,
						        WSREP_SERVICE_KEY_REFERENCE);
					}
#endif /* WITH_WSREP */
					goto end_scan;
				} else if (foreign->type != 0) {
					/* There is an ON UPDATE or ON DELETE
					condition: check them in a separate
					function */

					err = row_ins_foreign_check_on_constraint(
						thr, foreign, &pcur, entry,
						&mtr);
					if (err != DB_SUCCESS) {
						/* Since reporting a plain
						"duplicate key" error
						message to the user in
						cases where a long CASCADE
						operation would lead to a
						duplicate key in some
						other table is very
						confusing, map duplicate
						key errors resulting from
						FK constraints to a
						separate error code. */

						if (err == DB_DUPLICATE_KEY) {
							err = DB_FOREIGN_DUPLICATE_KEY;
						}

						goto end_scan;
					}

					/* row_ins_foreign_check_on_constraint
					may have repositioned pcur on a
					different block */
					block = btr_pcur_get_block(&pcur);
				} else {
					row_ins_foreign_report_err(
						"Trying to delete or update",
						thr, foreign, rec, entry);

					err = DB_ROW_IS_REFERENCED;
					goto end_scan;
				}
			}
		} else {
			ut_a(cmp < 0);

			err = skip_gap_lock
				? DB_SUCCESS
				: row_ins_set_shared_rec_lock(
					LOCK_GAP, block,
					rec, check_index, offsets, thr);

			switch (err) {
			case DB_SUCCESS_LOCKED_REC:
				err = DB_SUCCESS;
				/* fall through */
			case DB_SUCCESS:
				if (check_ref) {
					err = DB_NO_REFERENCED_ROW;
					row_ins_foreign_report_add_err(
						trx, foreign, rec, entry);
				}
			default:
				break;
			}

			goto end_scan;
		}
	} while (btr_pcur_move_to_next(&pcur, &mtr));

	if (check_ref) {
		row_ins_foreign_report_add_err(
			trx, foreign, btr_pcur_get_rec(&pcur), entry);
		err = DB_NO_REFERENCED_ROW;
	} else {
		err = DB_SUCCESS;
	}

end_scan:
	mtr_commit(&mtr);
	ut_free(pcur.old_rec_buf);

	/* Restore old value */
	dtuple_set_n_fields_cmp(entry, n_fields_cmp);

do_possible_lock_wait:
	if (err == DB_LOCK_WAIT) {
		trx->error_state = err;

		thr->lock_state = QUE_THR_LOCK_ROW;

		err = lock_wait(thr);

		thr->lock_state = QUE_THR_LOCK_NOLOCK;

		if (err == DB_SUCCESS) {
			err = DB_LOCK_WAIT;
		}
	}

exit_func:
	if (UNIV_LIKELY_NULL(heap)) {
		mem_heap_free(heap);
	}

	DBUG_RETURN(err);
}

/** Sets the values of the dtuple fields in ref_entry from the values of
foreign columns in entry.
@param[in]	foreign		foreign key constraint
@param[in]	index		clustered index
@param[in]	entry		tuple of clustered index
@param[in]	ref_entry	tuple of foreign columns
@return true if all foreign key fields present in clustered index */
static
bool row_ins_foreign_index_entry(dict_foreign_t *foreign,
                                 const dict_index_t *index,
                                 const dtuple_t *entry,
                                 dtuple_t *ref_entry)
{
  for (ulint i= 0; i < foreign->n_fields; i++)
  {
    for (ulint j= 0; j < index->n_fields; j++)
    {
      const dict_col_t *col= dict_index_get_nth_col(index, j);

      /* A clustered index may contain instantly dropped columns,
      which must be skipped. */
      if (col->is_dropped())
        continue;

      const Lex_ident_column col_name=
        dict_table_get_col_name(index->table, col->ind);
      if (col_name.streq(Lex_cstring_strlen(foreign->foreign_col_names[i])))
      {
        dfield_copy(&ref_entry->fields[i], &entry->fields[j]);
        goto got_match;
      }
    }
    return false;
got_match:
    continue;
  }

  return true;
}

/***************************************************************//**
Checks if foreign key constraints fail for an index entry. If index
is not mentioned in any constraint, this function does nothing,
Otherwise does searches to the indexes of referenced tables and
sets shared locks which lock either the success or the failure of
a constraint.
@return DB_SUCCESS or error code */
static MY_ATTRIBUTE((nonnull, warn_unused_result))
dberr_t
row_ins_check_foreign_constraints(
/*==============================*/
	dict_table_t*	table,	/*!< in: table */
	dict_index_t*	index,	/*!< in: index */
	bool		pk,	/*!< in: index->is_primary() */
	dtuple_t*	entry,	/*!< in: index entry for index */
	que_thr_t*	thr)	/*!< in: query thread */
{
	dict_foreign_t*	foreign;
	dberr_t		err = DB_SUCCESS;
	mem_heap_t*	heap = NULL;

	DBUG_ASSERT(index->is_primary() == pk);

	DEBUG_SYNC_C_IF_THD(thr_get_trx(thr)->mysql_thd,
			    "foreign_constraint_check_for_ins");

	for (dict_foreign_set::iterator it = table->foreign_set.begin();
	     err == DB_SUCCESS && it != table->foreign_set.end();
	     ++it) {

		foreign = *it;

		if (foreign->foreign_index == index
		    || (pk && !foreign->foreign_index)) {

			dtuple_t*	ref_tuple = entry;
			if (UNIV_UNLIKELY(!foreign->foreign_index)) {
				/* Change primary key entry to
				foreign key index entry */
				if (!heap) {
					heap = mem_heap_create(1000);
				} else {
					mem_heap_empty(heap);
				}

				ref_tuple = dtuple_create(
					heap, foreign->n_fields);
				dtuple_set_n_fields_cmp(
					ref_tuple, foreign->n_fields);
				if (!row_ins_foreign_index_entry(
					foreign, index, entry, ref_tuple)) {
					err = DB_NO_REFERENCED_ROW;
					break;
				}

			}

			dict_table_t*	ref_table = NULL;
			dict_table_t*	referenced_table
						= foreign->referenced_table;

			if (referenced_table == NULL) {

				ref_table = dict_table_open_on_name(
					foreign->referenced_table_name_lookup,
					false, DICT_ERR_IGNORE_NONE);
			}

			err = row_ins_check_foreign_constraint(
				TRUE, foreign, table, ref_tuple, thr);

			if (ref_table) {
				dict_table_close(ref_table);
			}
		}
	}

	if (UNIV_LIKELY_NULL(heap)) {
		mem_heap_free(heap);
	}

	return err;
}

/***************************************************************//**
Checks if a unique key violation to rec would occur at the index entry
insert.
@return TRUE if error */
static
ibool
row_ins_dupl_error_with_rec(
/*========================*/
	const rec_t*	rec,	/*!< in: user record; NOTE that we assume
				that the caller already has a record lock on
				the record! */
	const dtuple_t*	entry,	/*!< in: entry to insert */
	dict_index_t*	index,	/*!< in: index */
	const rec_offs*	offsets)/*!< in: rec_get_offsets(rec, index) */
{
	ulint	i;

	ut_ad(rec_offs_validate(rec, index, offsets));

	const auto n_unique = dict_index_get_n_unique(index);

	uint16_t matched_fields = 0;

	cmp_dtuple_rec_with_match(entry, rec, index, offsets, &matched_fields);

	if (matched_fields < n_unique) {

		return(FALSE);
	}

	/* In a unique secondary index we allow equal key values if they
	contain SQL NULLs */

	if (!dict_index_is_clust(index)) {

		for (i = 0; i < n_unique; i++) {
			if (dfield_is_null(dtuple_get_nth_field(entry, i))) {

				return(FALSE);
			}
		}
	}

	return(!rec_get_deleted_flag(rec, rec_offs_comp(offsets)));
}

/** Determine whether a history row was inserted by this transaction
(row TRX_ID is the same as current TRX_ID).
@param index  secondary index
@param rec    secondary index record
@param trx    transaction
@return error code
@retval DB_SUCCESS                on success
@retval DB_FOREIGN_DUPLICATE_KEY  if a history row was inserted by trx */
static dberr_t vers_row_same_trx(dict_index_t* index, const rec_t* rec,
                                 const trx_t& trx)
{
  mtr_t mtr;
  dberr_t ret= DB_SUCCESS;
  dict_index_t *clust_index= dict_table_get_first_index(index->table);
  ut_ad(index != clust_index);

  mtr.start();

  if (const rec_t *clust_rec=
      row_get_clust_rec(BTR_SEARCH_LEAF, rec, index, &clust_index, &mtr))
  {
    rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
    rec_offs *clust_offs= offsets_;
    rec_offs_init(offsets_);
    mem_heap_t *heap= NULL;

    clust_offs=
      rec_get_offsets(clust_rec, clust_index, clust_offs,
                      clust_index->n_core_fields, ULINT_UNDEFINED, &heap);
    if (clust_index->vers_history_row(clust_rec, clust_offs))
    {
      ulint trx_id_len;
      const byte *trx_id= rec_get_nth_field(clust_rec, clust_offs,
                                            clust_index->n_uniq, &trx_id_len);
      ut_ad(trx_id_len == DATA_TRX_ID_LEN);

      if (trx.id == trx_read_trx_id(trx_id))
        ret= DB_FOREIGN_DUPLICATE_KEY;
    }

    if (UNIV_LIKELY_NULL(heap))
      mem_heap_free(heap);
  }
  else
  {
    ib::error() << "foreign constraints: secondary index " << index->name <<
                   " of table " << index->table->name << " is out of sync";
    ut_ad("secondary index is out of sync" == 0);
    ret= DB_TABLE_CORRUPT;
  }

  mtr.commit();
  return ret;
}

/***************************************************************//**
Scans a unique non-clustered index at a given index entry to determine
whether a uniqueness violation has occurred for the key value of the entry.
Set shared locks on possible duplicate records.
@return DB_SUCCESS, DB_DUPLICATE_KEY, or DB_LOCK_WAIT */
static MY_ATTRIBUTE((nonnull, warn_unused_result))
dberr_t
row_ins_scan_sec_index_for_duplicate(
/*=================================*/
	ulint		flags,	/*!< in: undo logging and locking flags */
	dict_index_t*	index,	/*!< in: non-clustered unique index */
	dtuple_t*	entry,	/*!< in: index entry */
	que_thr_t*	thr,	/*!< in: query thread */
	mtr_t*		mtr,	/*!< in/out: mini-transaction */
	mem_heap_t*	offsets_heap)
				/*!< in/out: memory heap that can be emptied */
{
	int		cmp;
	ulint		n_fields_cmp;
	btr_pcur_t	pcur;
	rec_offs	offsets_[REC_OFFS_SEC_INDEX_SIZE];
	rec_offs*	offsets		= offsets_;
	DBUG_ENTER("row_ins_scan_sec_index_for_duplicate");

	rec_offs_init(offsets_);

	ut_ad(!index->lock.have_any());

	const auto n_unique = dict_index_get_n_unique(index);

	/* If the secondary index is unique, but one of the fields in the
	n_unique first fields is NULL, a unique key violation cannot occur,
	since we define NULL != NULL in this case */
	if (index->n_nullable && dtuple_contains_null(entry, n_unique))
		DBUG_RETURN(DB_SUCCESS);

	/* Store old value on n_fields_cmp */

	n_fields_cmp = dtuple_get_n_fields_cmp(entry);

	dtuple_set_n_fields_cmp(entry, n_unique);
	pcur.btr_cur.page_cur.index = index;
	trx_t* const trx = thr_get_trx(thr);
	dberr_t err = btr_pcur_open(entry, PAGE_CUR_GE, BTR_SEARCH_LEAF,
				    &pcur, mtr);
	if (err != DB_SUCCESS) {
		goto end_scan;
	}

	/* Scan index records and check if there is a duplicate */

	do {
		const rec_t*		rec	= btr_pcur_get_rec(&pcur);
		const buf_block_t*	block	= btr_pcur_get_block(&pcur);
		const ulint		lock_type = LOCK_ORDINARY;

		if (page_rec_is_infimum_low(rec - block->page.frame)) {

			continue;
		}

		offsets = rec_get_offsets(rec, index, offsets,
					  index->n_core_fields,
					  ULINT_UNDEFINED, &offsets_heap);

		if (flags & BTR_NO_LOCKING_FLAG) {
			/* Set no locks when applying log
			in online table rebuild. */
		} else if (trx->duplicates) {

			/* If the SQL-query will update or replace
			duplicate key we will take X-lock for
			duplicates ( REPLACE, LOAD DATAFILE REPLACE,
			INSERT ON DUPLICATE KEY UPDATE). */

			err = row_ins_set_exclusive_rec_lock(
				lock_type, block, rec, index, offsets, thr);
		} else {

			err = row_ins_set_shared_rec_lock(
				lock_type, block, rec, index, offsets, thr);
		}

		switch (err) {
		case DB_SUCCESS_LOCKED_REC:
			err = DB_SUCCESS;
		case DB_SUCCESS:
			break;
		default:
			goto end_scan;
		}

		if (page_rec_is_supremum_low(rec - block->page.frame)) {

			continue;
		}

		cmp = cmp_dtuple_rec(entry, rec, index, offsets);

		if (cmp == 0) {
			if (row_ins_dupl_error_with_rec(rec, entry,
							index, offsets)) {

				err = DB_DUPLICATE_KEY;

				trx->error_info = index;

				if (!index->table->versioned()) {
				} else if (dberr_t e =
					   vers_row_same_trx(index, rec,
							     *trx)) {
					err = e;
					goto end_scan;
				}

				/* If the duplicate is on hidden FTS_DOC_ID,
				state so in the error log */
				if (index == index->table->fts_doc_id_index
				    && DICT_TF2_FLAG_IS_SET(
					index->table,
					DICT_TF2_FTS_HAS_DOC_ID)) {

					ib::error() << "Duplicate FTS_DOC_ID"
						" value on table "
						<< index->table->name;
				}

				goto end_scan;
			}
		} else {
			ut_a(cmp < 0);
			goto end_scan;
		}
	} while (btr_pcur_move_to_next(&pcur, mtr));

end_scan:
	/* Restore old value */
	dtuple_set_n_fields_cmp(entry, n_fields_cmp);

	DBUG_RETURN(err);
}

/** Checks for a duplicate when the table is being rebuilt online.
@param n_uniq   index->db_trx_id()
@param entry    entry being inserted
@param rec      clustered index record at insert position
@param index    clustered index
@param offsets  rec_get_offsets(rec)
@retval DB_SUCCESS when no duplicate is detected
@retval DB_SUCCESS_LOCKED_REC when rec is an exact match of entry or
a newer version of entry (the entry should not be inserted)
@retval DB_DUPLICATE_KEY when entry is a duplicate of rec */
static MY_ATTRIBUTE((nonnull, warn_unused_result))
dberr_t
row_ins_duplicate_online(ulint n_uniq, const dtuple_t *entry,
                         const rec_t *rec, const dict_index_t *index,
                         rec_offs *offsets)
{
	uint16_t fields	= 0;

	/* During rebuild, there should not be any delete-marked rows
	in the new table. */
	ut_ad(!rec_get_deleted_flag(rec, rec_offs_comp(offsets)));
	ut_ad(dtuple_get_n_fields_cmp(entry) == n_uniq);
	ut_ad(n_uniq == index->db_trx_id());

	/* Compare the PRIMARY KEY fields and the DB_TRX_ID, DB_ROLL_PTR. */
	cmp_dtuple_rec_with_match_low(entry, rec, index, offsets, n_uniq + 2,
				      &fields);

	if (fields < n_uniq) {
		/* Not a duplicate. */
		return(DB_SUCCESS);
	}

	ulint trx_id_len;

	if (fields == n_uniq + 2
	    && memcmp(rec_get_nth_field(rec, offsets, n_uniq, &trx_id_len),
		      reset_trx_id, DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)) {
		ut_ad(trx_id_len == DATA_TRX_ID_LEN);
		/* rec is an exact match of entry, and DB_TRX_ID belongs
		to a transaction that started after our ALTER TABLE. */
		return(DB_SUCCESS_LOCKED_REC);
	}

	return(DB_DUPLICATE_KEY);
}

/** Checks for a duplicate when the table is being rebuilt online.
@retval DB_SUCCESS when no duplicate is detected
@retval DB_SUCCESS_LOCKED_REC when rec is an exact match of entry or
a newer version of entry (the entry should not be inserted)
@retval DB_DUPLICATE_KEY when entry is a duplicate of rec */
static MY_ATTRIBUTE((nonnull, warn_unused_result))
dberr_t
row_ins_duplicate_error_in_clust_online(
/*====================================*/
	ulint		n_uniq,	/*!< in: offset of DB_TRX_ID */
	const dtuple_t*	entry,	/*!< in: entry that is being inserted */
	const btr_cur_t*cursor,	/*!< in: cursor on insert position */
	rec_offs**	offsets,/*!< in/out: rec_get_offsets(rec) */
	mem_heap_t**	heap)	/*!< in/out: heap for offsets */
{
	dberr_t		err	= DB_SUCCESS;
	const rec_t*	rec	= btr_cur_get_rec(cursor);

	ut_ad(!cursor->index()->is_instant());

	if (cursor->low_match >= n_uniq
	    && !page_rec_is_infimum_low(rec - btr_cur_get_page(cursor))) {
		*offsets = rec_get_offsets(rec, cursor->index(), *offsets,
					   cursor->index()->n_fields,
					   ULINT_UNDEFINED, heap);
		err = row_ins_duplicate_online(n_uniq, entry,
					       rec, cursor->index(), *offsets);
		if (err != DB_SUCCESS) {
			return(err);
		}
	}

	if (!(rec = page_rec_get_next_const(rec))) {
		return DB_CORRUPTION;
	}

	if (cursor->up_match >= n_uniq && !page_rec_is_supremum(rec)) {
		*offsets = rec_get_offsets(rec, cursor->index(), *offsets,
					   cursor->index()->n_fields,
					   ULINT_UNDEFINED, heap);
		err = row_ins_duplicate_online(n_uniq, entry,
					       rec, cursor->index(), *offsets);
	}

	return(err);
}

/***************************************************************//**
Checks if a unique key violation error would occur at an index entry
insert. Sets shared locks on possible duplicate records. Works only
for a clustered index!
@retval DB_SUCCESS if no error
@retval DB_DUPLICATE_KEY if error,
@retval DB_LOCK_WAIT if we have to wait for a lock on a possible duplicate
record */
static MY_ATTRIBUTE((nonnull, warn_unused_result))
dberr_t
row_ins_duplicate_error_in_clust(
	ulint		flags,	/*!< in: undo logging and locking flags */
	btr_cur_t*	cursor,	/*!< in: B-tree cursor */
	const dtuple_t*	entry,	/*!< in: entry to insert */
	que_thr_t*	thr)	/*!< in: query thread */
{
	dberr_t	err;
	rec_t*	rec;
	ulint	n_unique;
	trx_t*	trx		= thr_get_trx(thr);
	mem_heap_t*heap		= NULL;
	rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
	rec_offs* offsets		= offsets_;
	rec_offs_init(offsets_);

	ut_ad(cursor->index()->is_clust());

	/* NOTE: For unique non-clustered indexes there may be any number
	of delete marked records with the same value for the non-clustered
	index key (remember multiversioning), and which differ only in
	the row refererence part of the index record, containing the
	clustered index key fields. For such a secondary index record,
	to avoid race condition, we must FIRST do the insertion and after
	that check that the uniqueness condition is not breached! */

	/* NOTE: A problem is that in the B-tree node pointers on an
	upper level may match more to the entry than the actual existing
	user records on the leaf level. So, even if low_match would suggest
	that a duplicate key violation may occur, this may not be the case. */

	n_unique = dict_index_get_n_unique(cursor->index());

	if (cursor->low_match >= n_unique) {

		rec = btr_cur_get_rec(cursor);

		if (!page_rec_is_infimum_low(rec - btr_cur_get_page(cursor))) {
			offsets = rec_get_offsets(rec, cursor->index(),
						  offsets,
						  cursor->index()
						  ->n_core_fields,
						  ULINT_UNDEFINED, &heap);

			/* We set a lock on the possible duplicate: this
			is needed in logical logging of MySQL to make
			sure that in roll-forward we get the same duplicate
			errors as in original execution */

			if (flags & BTR_NO_LOCKING_FLAG) {
				/* Do nothing if no-locking is set */
				err = DB_SUCCESS;
			} else if (trx->duplicates) {

				/* If the SQL-query will update or replace
				duplicate key we will take X-lock for
				duplicates ( REPLACE, LOAD DATAFILE REPLACE,
				INSERT ON DUPLICATE KEY UPDATE). */

				err = row_ins_set_exclusive_rec_lock(
					LOCK_REC_NOT_GAP,
					btr_cur_get_block(cursor),
					rec, cursor->index(), offsets, thr);
			} else {

				err = row_ins_set_shared_rec_lock(
					LOCK_REC_NOT_GAP,
					btr_cur_get_block(cursor), rec,
					cursor->index(), offsets, thr);
			}

			switch (err) {
			case DB_SUCCESS_LOCKED_REC:
			case DB_SUCCESS:
				break;
			default:
				goto func_exit;
			}

			if (row_ins_dupl_error_with_rec(
				    rec, entry, cursor->index(), offsets)) {
duplicate:
				trx->error_info = cursor->index();
				err = DB_DUPLICATE_KEY;
				if (thr->prebuilt
				    && thr->prebuilt->upd_node
				    && thr->prebuilt->upd_node->is_delete
					== VERSIONED_DELETE
				    && entry->vers_history_row())
				{
					ulint trx_id_len;
					byte *trx_id = rec_get_nth_field(
						rec, offsets, n_unique,
						&trx_id_len);
					ut_ad(trx_id_len == DATA_TRX_ID_LEN);
					if (trx->id == trx_read_trx_id(trx_id)) {
						err = DB_FOREIGN_DUPLICATE_KEY;
					}
				}
				goto func_exit;
			}
		}
	}

	err = DB_SUCCESS;

	if (cursor->up_match >= n_unique) {

		rec = page_rec_get_next(btr_cur_get_rec(cursor));

		if (rec && !page_rec_is_supremum(rec)) {
			offsets = rec_get_offsets(rec, cursor->index(),
						  offsets,
						  cursor->index()
						  ->n_core_fields,
						  ULINT_UNDEFINED, &heap);

			if (trx->duplicates) {

				/* If the SQL-query will update or replace
				duplicate key we will take X-lock for
				duplicates ( REPLACE, LOAD DATAFILE REPLACE,
				INSERT ON DUPLICATE KEY UPDATE). */

				err = row_ins_set_exclusive_rec_lock(
					LOCK_REC_NOT_GAP,
					btr_cur_get_block(cursor),
					rec, cursor->index(), offsets, thr);
			} else {

				err = row_ins_set_shared_rec_lock(
					LOCK_REC_NOT_GAP,
					btr_cur_get_block(cursor),
					rec, cursor->index(), offsets, thr);
			}

			switch (err) {
			default:
				break;
			case DB_SUCCESS_LOCKED_REC:
				err = DB_SUCCESS;
				/* fall through */
			case DB_SUCCESS:
				if (row_ins_dupl_error_with_rec(
					    rec, entry, cursor->index(),
					    offsets)) {
					goto duplicate;
				}
			}
		}

		/* This should never happen */
		err = DB_CORRUPTION;
	}
func_exit:
	if (UNIV_LIKELY_NULL(heap)) {
		mem_heap_free(heap);
	}
	return(err);
}

/***************************************************************//**
Checks if an index entry has long enough common prefix with an
existing record so that the intended insert of the entry must be
changed to a modify of the existing record. In the case of a clustered
index, the prefix must be n_unique fields long. In the case of a
secondary index, all fields must be equal.  InnoDB never updates
secondary index records in place, other than clearing or setting the
delete-mark flag. We could be able to update the non-unique fields
of a unique secondary index record by checking the cursor->up_match,
but we do not do so, because it could have some locking implications.
@return TRUE if the existing record should be updated; FALSE if not */
UNIV_INLINE
ibool
row_ins_must_modify_rec(
/*====================*/
	const btr_cur_t*	cursor)	/*!< in: B-tree cursor */
{
	/* NOTE: (compare to the note in row_ins_duplicate_error_in_clust)
	Because node pointers on upper levels of the B-tree may match more
	to entry than to actual user records on the leaf level, we
	have to check if the candidate record is actually a user record.
	A clustered index node pointer contains index->n_unique first fields,
	and a secondary index node pointer contains all index fields. */

	return(cursor->low_match
	       >= dict_index_get_n_unique_in_tree(cursor->index())
	       && !page_rec_is_infimum(btr_cur_get_rec(cursor)));
}

/** Insert the externally stored fields (off-page columns)
of a clustered index entry.
@param[in]	entry	index entry to insert
@param[in]	big_rec	externally stored fields
@param[in,out]	offsets	rec_get_offsets()
@param[in,out]	heap	memory heap
@param[in]	thd	client connection, or NULL
@param[in]	index	clustered index
@return	error code
@retval	DB_SUCCESS
@retval DB_OUT_OF_FILE_SPACE */
static
dberr_t
row_ins_index_entry_big_rec(
	const dtuple_t*		entry,
	const big_rec_t*	big_rec,
	rec_offs*		offsets,
	mem_heap_t**		heap,
	dict_index_t*		index,
	const void*		thd __attribute__((unused)))
{
	mtr_t		mtr;
	btr_pcur_t	pcur;
	rec_t*		rec;

	pcur.btr_cur.page_cur.index = index;
	ut_ad(index->is_primary());

	DEBUG_SYNC_C_IF_THD(thd, "before_row_ins_extern_latch");

	mtr.start();
	if (index->table->is_temporary()) {
		mtr.set_log_mode(MTR_LOG_NO_REDO);
	} else {
		index->set_modified(mtr);
	}

	dberr_t error = btr_pcur_open(entry, PAGE_CUR_LE, BTR_MODIFY_TREE,
				      &pcur, &mtr);
	if (error != DB_SUCCESS) {
		return error;
	}

	rec = btr_pcur_get_rec(&pcur);
	offsets = rec_get_offsets(rec, index, offsets, index->n_core_fields,
				  ULINT_UNDEFINED, heap);

	DEBUG_SYNC_C_IF_THD(thd, "before_row_ins_extern");
	error = btr_store_big_rec_extern_fields(
		&pcur, offsets, big_rec, &mtr, BTR_STORE_INSERT);
	DEBUG_SYNC_C_IF_THD(thd, "after_row_ins_extern");

	mtr.commit();

	ut_free(pcur.old_rec_buf);
	return(error);
}

/** Check whether the executed sql command is from insert
statement
@param thd thread information
@return true if it is insert statement */
static bool thd_sql_is_insert(const THD *thd) noexcept
{
  switch(thd_sql_command(thd))
  {
    case SQLCOM_INSERT:
    case SQLCOM_INSERT_SELECT:
      return true;
    default:
      return false;
  }
}

#if defined __aarch64__&&defined __GNUC__&&__GNUC__==4&&!defined __clang__
/* Avoid GCC 4.8.5 internal compiler error due to srw_mutex::wr_unlock().
We would only need this for row_ins_clust_index_entry_low(),
but GCC 4.8.5 does not support pop_options. */
# pragma GCC optimize ("O0")
#endif

/***************************************************************//**
Tries to insert an entry into a clustered index, ignoring foreign key
constraints. If a record with the same unique key is found, the other
record is necessarily marked deleted by a committed transaction, or a
unique key violation error occurs. The delete marked record is then
updated to an existing record, and we must write an undo log record on
the delete marked record.
@retval DB_SUCCESS on success
@retval DB_LOCK_WAIT on lock wait when !(flags & BTR_NO_LOCKING_FLAG)
@retval DB_FAIL if retry with BTR_MODIFY_TREE is needed
@return error code */
dberr_t
row_ins_clust_index_entry_low(
/*==========================*/
	ulint		flags,	/*!< in: undo logging and locking flags */
	btr_latch_mode	mode,	/*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
				depending on whether we wish optimistic or
				pessimistic descent down the index tree */
	dict_index_t*	index,	/*!< in: clustered index */
	ulint		n_uniq,	/*!< in: 0 or index->n_uniq */
	dtuple_t*	entry,	/*!< in/out: index entry to insert */
	ulint		n_ext,	/*!< in: number of externally stored columns */
	que_thr_t*	thr)	/*!< in: query thread */
{
	btr_pcur_t	pcur;
	dberr_t		err		= DB_SUCCESS;
	big_rec_t*	big_rec		= NULL;
	mtr_t		mtr;
	uint64_t	auto_inc	= 0;
	mem_heap_t*	offsets_heap	= NULL;
	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
	rec_offs*	offsets         = offsets_;
	rec_offs_init(offsets_);
	trx_t*		trx	= thr_get_trx(thr);
	buf_block_t*	block;

	DBUG_ENTER("row_ins_clust_index_entry_low");

	ut_ad(dict_index_is_clust(index));
	ut_ad(!dict_index_is_unique(index)
	      || n_uniq == dict_index_get_n_unique(index));
	ut_ad(!n_uniq || n_uniq == dict_index_get_n_unique(index));
	ut_ad(!trx->in_rollback);

	mtr.start();

	if (index->table->is_temporary()) {
		/* Disable REDO logging as the lifetime of temp-tables is
		limited to server or connection lifetime and so REDO
		information is not needed on restart for recovery.
		Disable locking as temp-tables are local to a connection. */

		ut_ad(flags & BTR_NO_LOCKING_FLAG);
		ut_ad(!dict_index_is_online_ddl(index));
		ut_ad(!index->table->persistent_autoinc);
		ut_ad(!index->is_instant());
		ut_ad(!entry->info_bits);
		mtr.set_log_mode(MTR_LOG_NO_REDO);
	} else {
		index->set_modified(mtr);

		if (UNIV_UNLIKELY(entry->info_bits != 0)) {
			ut_ad(entry->is_metadata());
			ut_ad(index->is_instant());
			ut_ad(!dict_index_is_online_ddl(index));
			ut_ad(mode == BTR_MODIFY_TREE);
			ut_ad(flags == BTR_NO_LOCKING_FLAG);
		} else {
			if (mode == BTR_MODIFY_LEAF
			    && dict_index_is_online_ddl(index)) {
				mode = BTR_MODIFY_LEAF_ALREADY_LATCHED;
				mtr_s_lock_index(index, &mtr);
			}

			if (unsigned ai = index->table->persistent_autoinc) {
				/* Prepare to persist the AUTO_INCREMENT value
				from the index entry to PAGE_ROOT_AUTO_INC. */
				const dfield_t* dfield = dtuple_get_nth_field(
					entry, ai - 1);
				if (!dfield_is_null(dfield)) {
					auto_inc = row_parse_int(
						static_cast<const byte*>(
							dfield->data),
						dfield->len,
						dfield->type.mtype,
						dfield->type.prtype
						& DATA_UNSIGNED);
					if (auto_inc
					    && mode != BTR_MODIFY_TREE) {
						mode = btr_latch_mode(
							BTR_MODIFY_ROOT_AND_LEAF
							^ BTR_MODIFY_LEAF
							^ mode);
					}
				}
			}
		}
	}

	/* Note that we use PAGE_CUR_LE as the search mode, because then
	the function will return in both low_match and up_match of the
	cursor sensible values */
	pcur.btr_cur.page_cur.index = index;
	err = btr_pcur_open(entry, PAGE_CUR_LE, mode, &pcur, &mtr);
	if (err != DB_SUCCESS) {
		index->table->file_unreadable = true;
err_exit:
		mtr.commit();
		goto func_exit;
	}

	if (auto_inc) {
		buf_block_t* root
			= mtr.at_savepoint(mode != BTR_MODIFY_ROOT_AND_LEAF);
		ut_ad(index->page == root->page.id().page_no());
		page_set_autoinc(root, auto_inc, &mtr, false);
	}

#ifdef UNIV_DEBUG
	{
		page_t*	page = btr_pcur_get_page(&pcur);
		rec_t*	first_rec = page_rec_get_next(
			page_get_infimum_rec(page));

		ut_ad(page_rec_is_supremum(first_rec)
		      || rec_n_fields_is_sane(index, first_rec, entry));
	}
#endif /* UNIV_DEBUG */

	block = btr_pcur_get_block(&pcur);

	DBUG_EXECUTE_IF("row_ins_row_level", goto row_level_insert;);

	if (!(flags & BTR_NO_UNDO_LOG_FLAG)
	    && page_is_empty(block->page.frame)
	    && !entry->is_metadata() && !trx->duplicates
	    && !trx->check_unique_secondary && !trx->check_foreigns
	    && !trx->dict_operation
	    && block->page.id().page_no() == index->page
	    && !index->table->is_native_online_ddl()
	    && (!dict_table_is_partition(index->table)
		|| thd_sql_is_insert(trx->mysql_thd))) {

		if (!index->table->n_rec_locks
		    && !index->table->versioned()
		    && !index->table->is_temporary()
		    && !index->table->has_spatial_index()) {

			ut_ad(!index->table->skip_alter_undo);
			trx->bulk_insert = true;
			err = lock_table(index->table, NULL, LOCK_X, thr);
			if (err != DB_SUCCESS) {
				trx->error_state = err;
				trx->bulk_insert = false;
				goto err_exit;
			}
			if (index->table->n_rec_locks) {
avoid_bulk:
				trx->bulk_insert = false;
				goto row_level_insert;
			}
#ifdef WITH_WSREP
			if (trx->is_wsrep())
			{
				if (!wsrep_thd_is_local_transaction(trx->mysql_thd))
					goto row_level_insert;
				if (wsrep_append_table_key(trx->mysql_thd, *index->table))
				{
					trx->error_state = DB_ROLLBACK;
					goto err_exit;
				}
			}
#endif /* WITH_WSREP */

#ifdef BTR_CUR_HASH_ADAPT
			auto &part = btr_search.get_part(*index);
			part.latch.wr_lock(SRW_LOCK_CALL);
			index->table->bulk_trx_id = trx->id;
			part.latch.wr_unlock();
#else /* BTR_CUR_HASH_ADAPT */
			index->table->bulk_trx_id = trx->id;
#endif /* BTR_CUR_HASH_ADAPT */

			/* Write TRX_UNDO_EMPTY undo log and
			start buffering the insert operation */
			err = trx_undo_report_row_operation(
				thr, index, entry,
				nullptr, 0, nullptr, nullptr,
				nullptr);

			if (err != DB_SUCCESS) {
				goto avoid_bulk;
			}

			export_vars.innodb_bulk_operations++;
			goto err_exit;
		}
	} else if (flags == (BTR_NO_UNDO_LOG_FLAG | BTR_NO_LOCKING_FLAG)
		   && !index->table->n_rec_locks) {

		ut_ad(index->table->skip_alter_undo);
		ut_ad(!entry->is_metadata());

		/* If foreign key exist and foreign key is enabled
		then avoid using bulk insert for copy algorithm */
		if (innodb_alter_copy_bulk
		    && !index->table->is_temporary()
		    && !index->table->versioned()
		    && !index->table->has_spatial_index()
		    && (!trx->check_foreigns
                        || (index->table->foreign_set.empty()
                            && index->table->referenced_set.empty()))) {
			ut_ad(page_is_empty(block->page.frame));
			/* This code path has been executed at the
			start of the alter operation. Consecutive
			insert operation are buffered in the
			bulk buffer and doesn't check for constraint
			validity of foreign key relationship. */
			trx_start_if_not_started(trx, true);
			trx->bulk_insert = true;
			auto m = trx->mod_tables.emplace(index->table, 0);
			m.first->second.start_bulk_insert(index->table, true);
			err = m.first->second.bulk_insert_buffered(
					*entry, *index, trx);
			goto err_exit;
		}
	}

row_level_insert:
	if (UNIV_UNLIKELY(entry->info_bits != 0)) {
		const rec_t* rec = btr_pcur_get_rec(&pcur);

		if (rec_get_info_bits(rec,
				      page_is_comp(btr_pcur_get_page(&pcur)))
		    & REC_INFO_MIN_REC_FLAG) {
			trx->error_info = index;
			err = DB_DUPLICATE_KEY;
			goto err_exit;
		}

		ut_ad(!row_ins_must_modify_rec(&pcur.btr_cur));
		goto do_insert;
	}

	if (rec_is_metadata(btr_pcur_get_rec(&pcur), *index)) {
		goto do_insert;
	}

	if (n_uniq
	    && (pcur.btr_cur.up_match >= n_uniq
		|| pcur.btr_cur.low_match >= n_uniq)) {

		if (flags
		    == (BTR_CREATE_FLAG | BTR_NO_LOCKING_FLAG
			| BTR_NO_UNDO_LOG_FLAG | BTR_KEEP_SYS_FLAG)) {
			/* Set no locks when applying log
			in online table rebuild. Only check for duplicates. */
			err = row_ins_duplicate_error_in_clust_online(
				n_uniq, entry, &pcur.btr_cur,
				&offsets, &offsets_heap);

			switch (err) {
			case DB_SUCCESS:
				break;
			default:
				ut_ad(0);
				/* fall through */
			case DB_SUCCESS_LOCKED_REC:
			case DB_DUPLICATE_KEY:
				trx->error_info = index;
			}
		} else {
			/* Note that the following may return also
			DB_LOCK_WAIT */

			err = row_ins_duplicate_error_in_clust(
				flags, &pcur.btr_cur, entry, thr);
		}

		if (err != DB_SUCCESS) {
			goto err_exit;
		}
	}

	/* Note: Allowing duplicates would qualify for modification of
	an existing record as the new entry is exactly same as old entry. */
	if (row_ins_must_modify_rec(&pcur.btr_cur)) {
		/* There is already an index entry with a long enough common
		prefix, we must convert the insert into a modify of an
		existing record */
		mem_heap_t*	entry_heap	= mem_heap_create(1024);

		err = row_ins_clust_index_entry_by_modify(
			&pcur, flags, mode, &offsets, &offsets_heap,
			entry_heap, entry, thr, &mtr);

		mtr_commit(&mtr);
		mem_heap_free(entry_heap);
	} else {
		if (index->is_instant()) entry->trim(*index);
do_insert:
		rec_t*	insert_rec;

		if (mode != BTR_MODIFY_TREE) {
			ut_ad(mode == BTR_MODIFY_LEAF
			      || mode == BTR_MODIFY_LEAF_ALREADY_LATCHED
			      || mode == BTR_MODIFY_ROOT_AND_LEAF
			      || mode
			      == BTR_MODIFY_ROOT_AND_LEAF_ALREADY_LATCHED);
			err = btr_cur_optimistic_insert(
				flags, &pcur.btr_cur, &offsets, &offsets_heap,
				entry, &insert_rec, &big_rec,
				n_ext, thr, &mtr);
		} else {
			if (buf_pool.running_out()) {
				err = DB_LOCK_TABLE_FULL;
				goto err_exit;
			}

			err = btr_cur_optimistic_insert(
				flags, &pcur.btr_cur,
				&offsets, &offsets_heap,
				entry, &insert_rec, &big_rec,
				n_ext, thr, &mtr);

			if (err == DB_FAIL) {
				err = btr_cur_pessimistic_insert(
					flags, &pcur.btr_cur,
					&offsets, &offsets_heap,
					entry, &insert_rec, &big_rec,
					n_ext, thr, &mtr);
			}
		}

		if (err == DB_SUCCESS && entry->info_bits) {
			if (buf_block_t* root
			    = btr_root_block_get(index, RW_X_LATCH, &mtr,
						 &err)) {
				btr_set_instant(root, *index, &mtr);
			} else {
				ut_ad("cannot find root page" == 0);
			}
		}

		mtr.commit();

		if (big_rec) {
			ut_ad(err == DB_SUCCESS);
			/* Online table rebuild could read (and
			ignore) the incomplete record at this point.
			If online rebuild is in progress, the
			row_ins_index_entry_big_rec() will write log. */

			DBUG_EXECUTE_IF(
				"row_ins_extern_checkpoint",
				log_write_up_to(mtr.commit_lsn(), true););
			err = row_ins_index_entry_big_rec(
				entry, big_rec, offsets, &offsets_heap, index,
				trx->mysql_thd);
			dtuple_convert_back_big_rec(index, entry, big_rec);
		}
	}

func_exit:
	if (offsets_heap != NULL) {
		mem_heap_free(offsets_heap);
	}

	ut_free(pcur.old_rec_buf);
	DBUG_RETURN(err);
}

/** Start a mini-transaction.
@param[in,out]	mtr		mini-transaction
@param[in,out]	index		secondary index */
static void row_ins_sec_mtr_start(mtr_t *mtr, dict_index_t *index)
{
	ut_ad(!dict_index_is_clust(index));
	ut_ad(mtr->is_named_space(index->table->space));

	const mtr_log_t	log_mode = mtr->get_log_mode();

	mtr->start();
	index->set_modified(*mtr);
	mtr->set_log_mode(log_mode);
}

/***************************************************************//**
Tries to insert an entry into a secondary index. If a record with exactly the
same fields is found, the other record is necessarily marked deleted.
It is then unmarked. Otherwise, the entry is just inserted to the index.
@retval DB_SUCCESS on success
@retval DB_LOCK_WAIT on lock wait when !(flags & BTR_NO_LOCKING_FLAG)
@retval DB_FAIL if retry with BTR_INSERT_TREE is needed
@return error code */
dberr_t
row_ins_sec_index_entry_low(
/*========================*/
	ulint		flags,	/*!< in: undo logging and locking flags */
	btr_latch_mode	mode,	/*!< in: BTR_MODIFY_LEAF or BTR_INSERT_TREE,
				depending on whether we wish optimistic or
				pessimistic descent down the index tree */
	dict_index_t*	index,	/*!< in: secondary index */
	mem_heap_t*	offsets_heap,
				/*!< in/out: memory heap that can be emptied */
	mem_heap_t*	heap,	/*!< in/out: memory heap */
	dtuple_t*	entry,	/*!< in/out: index entry to insert */
	trx_id_t	trx_id,	/*!< in: PAGE_MAX_TRX_ID during
				row_log_table_apply(), or 0 */
	que_thr_t*	thr)	/*!< in: query thread */
{
	DBUG_ENTER("row_ins_sec_index_entry_low");

	btr_cur_t	cursor;
	btr_latch_mode	search_mode	= mode;
	dberr_t		err;
	ulint		n_unique;
	mtr_t		mtr;
	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
	rec_offs*	offsets         = offsets_;
	rec_offs_init(offsets_);
	rtr_info_t	rtr_info;

	ut_ad(!dict_index_is_clust(index));
	ut_ad(mode == BTR_MODIFY_LEAF || mode == BTR_INSERT_TREE);

	cursor.rtr_info = NULL;
	cursor.page_cur.index = index;
	ut_ad(thr_get_trx(thr)->id != 0);

	mtr.start();

	if (index->table->is_temporary()) {
		/* Disable locking, because temporary tables are never
		shared between transactions or connections. */
		ut_ad(flags & BTR_NO_LOCKING_FLAG);
		mtr.set_log_mode(MTR_LOG_NO_REDO);
	} else {
		index->set_modified(mtr);
	}

	/* Note that we use PAGE_CUR_LE as the search mode, because then
	the function will return in both low_match and up_match of the
	cursor sensible values */

	if (index->is_spatial()) {
		rtr_init_rtr_info(&rtr_info, false, &cursor, index, false);
		rtr_info.thr = thr;
		rtr_info_update_btr(&cursor, &rtr_info);

		err = rtr_insert_leaf(&cursor, thr, entry, search_mode, &mtr);

		if (err == DB_SUCCESS && search_mode == BTR_MODIFY_LEAF
		    && rtr_info.mbr_adj) {
			mtr_commit(&mtr);
			search_mode = mode = BTR_MODIFY_TREE;
			rtr_clean_rtr_info(&rtr_info, true);
			rtr_init_rtr_info(&rtr_info, false, &cursor,
					  index, false);
			rtr_info.thr = thr;
			rtr_info_update_btr(&cursor, &rtr_info);
			mtr.start();
			if (index->table->is_temporary()) {
				mtr.set_log_mode(MTR_LOG_NO_REDO);
			} else {
				index->set_modified(mtr);
			}
			err = rtr_insert_leaf(&cursor, thr, entry,
					      search_mode, &mtr);
		}

		DBUG_EXECUTE_IF(
			"rtree_test_check_count", {
			goto func_exit;});

	} else {
		err = cursor.search_leaf(entry, PAGE_CUR_LE, search_mode,
					 &mtr);
	}

	if (err != DB_SUCCESS) {
		if (err == DB_DECRYPTION_FAILED) {
			innodb_decryption_failed(thr_get_trx(thr)->mysql_thd,
						 index->table);
		}
		goto func_exit;
	}

#ifdef UNIV_DEBUG
	{
		page_t*	page = btr_cur_get_page(&cursor);
		rec_t*	first_rec = page_rec_get_next(
			page_get_infimum_rec(page));

		ut_ad(page_rec_is_supremum(first_rec)
		      || rec_n_fields_is_sane(index, first_rec, entry));
	}
#endif /* UNIV_DEBUG */

	n_unique = dict_index_get_n_unique(index);

	if (dict_index_is_unique(index)
	    && (cursor.low_match >= n_unique || cursor.up_match >= n_unique)) {
		mtr_commit(&mtr);

		DEBUG_SYNC_C("row_ins_sec_index_unique");

		row_ins_sec_mtr_start(&mtr, index);

		err = row_ins_scan_sec_index_for_duplicate(
			flags, index, entry, thr, &mtr, offsets_heap);

		mtr_commit(&mtr);

		switch (err) {
		case DB_SUCCESS:
			break;
		case DB_DUPLICATE_KEY:
			if (!index->is_committed()) {
				ut_ad(!thr_get_trx(thr)
				      ->dict_operation_lock_mode);
				index->type |= DICT_CORRUPT;
				/* Do not return any error to the
				caller. The duplicate will be reported
				by ALTER TABLE or CREATE UNIQUE INDEX.
				Unfortunately we cannot report the
				duplicate key value to the DDL thread,
				because the altered_table object is
				private to its call stack. */
				err = DB_SUCCESS;
			}
			/* fall through */
		default:
			if (dict_index_is_spatial(index)) {
				rtr_clean_rtr_info(&rtr_info, true);
			}
			DBUG_RETURN(err);
		}

		row_ins_sec_mtr_start(&mtr, index);

		DEBUG_SYNC_C("row_ins_sec_index_entry_dup_locks_created");

		/* We did not find a duplicate and we have now
		locked with s-locks the necessary records to
		prevent any insertion of a duplicate by another
		transaction. Let us now reposition the cursor and
		continue the insertion. */
		err = cursor.search_leaf(entry, PAGE_CUR_LE, search_mode,
					 &mtr);
		if (err != DB_SUCCESS) {
			goto func_exit;
		}
	}

	if (row_ins_must_modify_rec(&cursor)) {
		/* There is already an index entry with a long enough common
		prefix, we must convert the insert into a modify of an
		existing record */
		offsets = rec_get_offsets(
			btr_cur_get_rec(&cursor), index, offsets,
			index->n_core_fields,
			ULINT_UNDEFINED, &offsets_heap);

		err = row_ins_sec_index_entry_by_modify(
			flags, mode, &cursor, &offsets,
			offsets_heap, heap, entry, thr, &mtr);

		if (err == DB_SUCCESS && dict_index_is_spatial(index)
		    && rtr_info.mbr_adj) {
			err = rtr_ins_enlarge_mbr(&cursor, &mtr);
		}
	} else {
		rec_t*		insert_rec;
		big_rec_t*	big_rec;

		if (mode == BTR_MODIFY_LEAF) {
			err = btr_cur_optimistic_insert(
				flags, &cursor, &offsets, &offsets_heap,
				entry, &insert_rec,
				&big_rec, 0, thr, &mtr);
			if (err == DB_SUCCESS
			    && dict_index_is_spatial(index)
			    && rtr_info.mbr_adj) {
				err = rtr_ins_enlarge_mbr(&cursor, &mtr);
			}
		} else {
			if (buf_pool.running_out()) {
				err = DB_LOCK_TABLE_FULL;
				goto func_exit;
			}

			err = btr_cur_optimistic_insert(
				flags, &cursor,
				&offsets, &offsets_heap,
				entry, &insert_rec,
				&big_rec, 0, thr, &mtr);
			if (err == DB_FAIL) {
				err = btr_cur_pessimistic_insert(
					flags, &cursor,
					&offsets, &offsets_heap,
					entry, &insert_rec,
					&big_rec, 0, thr, &mtr);
			}
			if (err == DB_SUCCESS
				   && dict_index_is_spatial(index)
				   && rtr_info.mbr_adj) {
				err = rtr_ins_enlarge_mbr(&cursor, &mtr);
			}
		}

		if (err == DB_SUCCESS && trx_id) {
			page_update_max_trx_id(
				btr_cur_get_block(&cursor),
				btr_cur_get_page_zip(&cursor),
				trx_id, &mtr);
		}

		ut_ad(!big_rec);
	}

func_exit:
	if (dict_index_is_spatial(index)) {
		rtr_clean_rtr_info(&rtr_info, true);
	}

	mtr_commit(&mtr);
	DBUG_RETURN(err);
}

/***************************************************************//**
Inserts an entry into a clustered index. Tries first optimistic,
then pessimistic descent down the tree. If the entry matches enough
to a delete marked record, performs the insert by updating or delete
unmarking the delete marked record.
@return DB_SUCCESS, DB_LOCK_WAIT, DB_DUPLICATE_KEY, or some other error code */
dberr_t
row_ins_clust_index_entry(
/*======================*/
	dict_index_t*	index,	/*!< in: clustered index */
	dtuple_t*	entry,	/*!< in/out: index entry to insert */
	que_thr_t*	thr,	/*!< in: query thread */
	ulint		n_ext)	/*!< in: number of externally stored columns */
{
	dberr_t	err;
	ulint	n_uniq;

	DBUG_ENTER("row_ins_clust_index_entry");

	if (!index->table->foreign_set.empty()) {
		err = row_ins_check_foreign_constraints(
			index->table, index, true, entry, thr);
		if (err != DB_SUCCESS) {

			DBUG_RETURN(err);
		}
	}

	n_uniq = dict_index_is_unique(index) ? index->n_uniq : 0;

#ifdef WITH_WSREP
	const bool skip_locking
		= thr_get_trx(thr)->is_wsrep() &&
		  wsrep_thd_skip_locking(thr_get_trx(thr)->mysql_thd);
	ulint	flags = index->table->no_rollback() ? BTR_NO_ROLLBACK
		: (index->table->is_temporary() || skip_locking)
		? BTR_NO_LOCKING_FLAG : 0;
#ifdef UNIV_DEBUG
	if (skip_locking && strcmp(wsrep_get_sr_table_name(),
                                   index->table->name.m_name)) {
		WSREP_ERROR("Record locking is disabled in this thread, "
			    "but the table being modified is not "
			    "`%s`: `%s`.", wsrep_get_sr_table_name(),
			    index->table->name.m_name);
		ut_error;
	}
#endif /* UNIV_DEBUG */
#else
	ulint	flags = index->table->no_rollback() ? BTR_NO_ROLLBACK
		: index->table->is_temporary()
		? BTR_NO_LOCKING_FLAG : 0;
#endif /* WITH_WSREP */
	const auto orig_n_fields = entry->n_fields;

	/* For intermediate table during copy alter table,
	   skip the undo log and record lock checking for
	   insertion operation.
	*/
	if (index->table->skip_alter_undo) {
		flags |= BTR_NO_UNDO_LOG_FLAG | BTR_NO_LOCKING_FLAG;
	}

	/* Try first optimistic descent to the B-tree */
	log_free_check();

	err = row_ins_clust_index_entry_low(
		flags, BTR_MODIFY_LEAF, index, n_uniq, entry,
		n_ext, thr);

	entry->n_fields = orig_n_fields;

	DEBUG_SYNC_C_IF_THD(thr_get_trx(thr)->mysql_thd,
			    "after_row_ins_clust_index_entry_leaf");

	if (err != DB_FAIL) {
		DEBUG_SYNC_C("row_ins_clust_index_entry_leaf_after");
		DBUG_RETURN(err);
	}

	/* Try then pessimistic descent to the B-tree */
	log_free_check();

	err = row_ins_clust_index_entry_low(
		flags, BTR_MODIFY_TREE, index, n_uniq, entry,
		n_ext, thr);

	entry->n_fields = orig_n_fields;

	DBUG_RETURN(err);
}

/***************************************************************//**
Inserts an entry into a secondary index. Tries first optimistic,
then pessimistic descent down the tree. If the entry matches enough
to a delete marked record, performs the insert by updating or delete
unmarking the delete marked record.
@return DB_SUCCESS, DB_LOCK_WAIT, DB_DUPLICATE_KEY, or some other error code */
dberr_t
row_ins_sec_index_entry(
/*====================*/
	dict_index_t*	index,	/*!< in: secondary index */
	dtuple_t*	entry,	/*!< in/out: index entry to insert */
	que_thr_t*	thr,	/*!< in: query thread */
	bool		check_foreign) /*!< in: true if check
				foreign table is needed, false otherwise */
{
	dberr_t		err = DB_SUCCESS;
	mem_heap_t*	offsets_heap;
	mem_heap_t*	heap;
	trx_id_t	trx_id  = 0;

	DBUG_EXECUTE_IF("row_ins_sec_index_entry_timeout", {
			DBUG_SET("-d,row_ins_sec_index_entry_timeout");
			return(DB_LOCK_WAIT);});

	if (check_foreign && !index->table->foreign_set.empty()) {
		err = row_ins_check_foreign_constraints(index->table, index,
							false, entry, thr);
		if (err != DB_SUCCESS) {

			return(err);
		}
	}

	ut_ad(thr_get_trx(thr)->id != 0);

	offsets_heap = mem_heap_create(1024);
	heap = mem_heap_create(1024);

	/* Try first optimistic descent to the B-tree */

	log_free_check();
	ulint flags = index->table->is_temporary()
		? BTR_NO_LOCKING_FLAG
		: 0;

	/* For intermediate table during copy alter table,
	   skip the undo log and record lock checking for
	   insertion operation.
	*/
	if (index->table->skip_alter_undo) {
		trx_id = thr_get_trx(thr)->id;
		flags |= BTR_NO_UNDO_LOG_FLAG | BTR_NO_LOCKING_FLAG;
	}

	err = row_ins_sec_index_entry_low(
		flags, BTR_MODIFY_LEAF, index, offsets_heap, heap, entry,
		trx_id, thr);
	if (err == DB_FAIL) {
		mem_heap_empty(heap);

		/* Try then pessimistic descent to the B-tree */
		log_free_check();

		err = row_ins_sec_index_entry_low(
			flags, BTR_INSERT_TREE, index,
			offsets_heap, heap, entry, 0, thr);
	}

	mem_heap_free(heap);
	mem_heap_free(offsets_heap);
	return(err);
}

/***************************************************************//**
Inserts an index entry to index. Tries first optimistic, then pessimistic
descent down the tree. If the entry matches enough to a delete marked record,
performs the insert by updating or delete unmarking the delete marked
record.
@return DB_SUCCESS, DB_LOCK_WAIT, DB_DUPLICATE_KEY, or some other error code */
static
dberr_t
row_ins_index_entry(
/*================*/
	dict_index_t*	index,	/*!< in: index */
	dtuple_t*	entry,	/*!< in/out: index entry to insert */
	que_thr_t*	thr)	/*!< in: query thread */
{
	trx_t* trx = thr_get_trx(thr);

	ut_ad(trx->id || index->table->no_rollback()
	      || index->table->is_temporary());

	DBUG_EXECUTE_IF("row_ins_index_entry_timeout", {
			DBUG_SET("-d,row_ins_index_entry_timeout");
			return(DB_LOCK_WAIT);});

	if (index->is_btree()) {
		/* If the InnoDB skips the sorting of primary
		index for bulk insert operation then InnoDB
		should have called load_one_row() for the
		first insert statement and shouldn't use
		buffer for consecutive insert statement */
		if (auto t= trx->use_bulk_buffer(index)) {
			/* MDEV-25036 FIXME:
			row_ins_check_foreign_constraint() check
			should be done before buffering the insert
			operation. */
			ut_ad(index->table->skip_alter_undo
			      || !trx->check_foreigns);
			return t->bulk_insert_buffered(*entry, *index, trx);
		}
	}

	if (index->is_primary()) {
		return row_ins_clust_index_entry(index, entry, thr, 0);
	} else {
		return row_ins_sec_index_entry(index, entry, thr);
	}
}


/*****************************************************************//**
This function generate MBR (Minimum Bounding Box) for spatial objects
and set it to spatial index field. */
static
void
row_ins_spatial_index_entry_set_mbr_field(
/*======================================*/
	dfield_t*	field,		/*!< in/out: mbr field */
	const dfield_t*	row_field)	/*!< in: row field */
{
	ulint		dlen = 0;
	double		mbr[SPDIMS * 2];

	/* This must be a GEOMETRY datatype */
	ut_ad(DATA_GEOMETRY_MTYPE(field->type.mtype));

	const byte* dptr = static_cast<const byte*>(
		dfield_get_data(row_field));
	dlen = dfield_get_len(row_field);

	/* obtain the MBR */
	rtree_mbr_from_wkb(dptr + GEO_DATA_HEADER_SIZE,
			   static_cast<uint>(dlen - GEO_DATA_HEADER_SIZE),
			   SPDIMS, mbr);

	/* Set mbr as index entry data */
	dfield_write_mbr(field, mbr);
}

/** Sets the values of the dtuple fields in entry from the values of appropriate
columns in row.
@param[in]	node	row insert node
@param[in]	thr	query thread
@retval DB_SUCCESS if the set is successful
@retval DB_CANT_CREATE_GEOMETRY_OBJECT when spatial index fails to
create geometry object */
static
dberr_t
row_ins_index_entry_set_vals(const ins_node_t* node, que_thr_t* thr)
{
	const dict_index_t* index = node->index;
	dtuple_t* entry = *node->entry;
	const dtuple_t* row = node->row;
	ulint	num_v = dtuple_get_n_v_fields(entry);
	ulint n_fields = dtuple_get_n_fields(entry);

	for (ulint i = 0; i < n_fields + num_v; i++) {
		dict_field_t*	ind_field = NULL;
		dfield_t*	field;
		const dfield_t*	row_field;
		ulint		len;
		dict_col_t*	col;

		if (i >= n_fields) {
			/* This is virtual field */
			field = dtuple_get_nth_v_field(entry, i - n_fields);
			col = &dict_table_get_nth_v_col(
				index->table, i - n_fields)->m_col;
		} else {
			field = dtuple_get_nth_field(entry, i);
			ind_field = dict_index_get_nth_field(index, i);
			col = ind_field->col;
		}

		if (col->is_virtual()) {
			const dict_v_col_t*     v_col
				= reinterpret_cast<const dict_v_col_t*>(col);
			ut_ad(dtuple_get_n_fields(row)
			      == dict_table_get_n_cols(index->table));
			row_field = dtuple_get_nth_v_field(row, v_col->v_pos);
		} else if (col->is_dropped()) {
			ut_ad(index->is_primary());

			if (!(col->prtype & DATA_NOT_NULL)) {
				field->data = NULL;
				field->len = UNIV_SQL_NULL;
				field->type.prtype = DATA_BINARY_TYPE;
			} else {
				ut_ad(ind_field->fixed_len <= col->len);
				dfield_set_data(field, field_ref_zero,
						ind_field->fixed_len);
				field->type.prtype = DATA_NOT_NULL;
			}

			field->type.mtype = col->len
				? DATA_FIXBINARY : DATA_BINARY;
			continue;
		} else {
			row_field = dtuple_get_nth_field(
				row, ind_field->col->ind);
		}

		len = dfield_get_len(row_field);

		/* Check column prefix indexes */
		if (ind_field != NULL && ind_field->prefix_len > 0
		    && len != UNIV_SQL_NULL) {

			const	dict_col_t*	col
				= dict_field_get_col(ind_field);

			len = dtype_get_at_most_n_mbchars(
				col->prtype, col->mbminlen, col->mbmaxlen,
				ind_field->prefix_len,
				len,
				static_cast<const char*>(
					dfield_get_data(row_field)));

			ut_ad(!dfield_is_ext(row_field));
		}

		/* Handle spatial index. For the first field, replace
		the data with its MBR (Minimum Bounding Box). */
		if ((i == 0) && dict_index_is_spatial(index)) {
			if (!row_field->data
			    || row_field->len < GEO_DATA_HEADER_SIZE) {
				thr_get_trx(thr)->error_info = index;
				return(DB_CANT_CREATE_GEOMETRY_OBJECT);
			}
			row_ins_spatial_index_entry_set_mbr_field(
				field, row_field);
			continue;
		}

		dfield_set_data(field, dfield_get_data(row_field), len);
		if (dfield_is_ext(row_field)) {
			ut_ad(dict_index_is_clust(index));
			dfield_set_ext(field);
		}
	}

	return(DB_SUCCESS);
}

/***********************************************************//**
Inserts a single index entry to the table.
@return DB_SUCCESS if operation successfully completed, else error
code or DB_LOCK_WAIT */
static MY_ATTRIBUTE((nonnull, warn_unused_result))
dberr_t
row_ins_index_entry_step(
/*=====================*/
	ins_node_t*	node,	/*!< in: row insert node */
	que_thr_t*	thr)	/*!< in: query thread */
{
	dberr_t	err;

	DBUG_ENTER("row_ins_index_entry_step");

	ut_ad(dtuple_check_typed(node->row));

	err = row_ins_index_entry_set_vals(node, thr);

	if (err != DB_SUCCESS) {
		DBUG_RETURN(err);
	}

	ut_ad(dtuple_check_typed(*node->entry));

	err = row_ins_index_entry(node->index, *node->entry, thr);

	DEBUG_SYNC_C_IF_THD(thr_get_trx(thr)->mysql_thd,
			    "after_row_ins_index_entry_step");

	DBUG_RETURN(err);
}

/***********************************************************//**
Gets a row to insert from the values list. */
UNIV_INLINE
void
row_ins_get_row_from_values(
/*========================*/
	ins_node_t*	node)	/*!< in: row insert node */
{
	que_node_t*	list_node;
	dfield_t*	dfield;
	dtuple_t*	row;
	ulint		i;

	/* The field values are copied in the buffers of the select node and
	it is safe to use them until we fetch from select again: therefore
	we can just copy the pointers */

	row = node->row;

	i = 0;
	list_node = node->values_list;

	while (list_node) {
		eval_exp(list_node);

		dfield = dtuple_get_nth_field(row, i);
		dfield_copy_data(dfield, que_node_get_val(list_node));

		i++;
		list_node = que_node_get_next(list_node);
	}
}

/***********************************************************//**
Gets a row to insert from the select list. */
UNIV_INLINE
void
row_ins_get_row_from_select(
/*========================*/
	ins_node_t*	node)	/*!< in: row insert node */
{
	que_node_t*	list_node;
	dfield_t*	dfield;
	dtuple_t*	row;
	ulint		i;

	/* The field values are copied in the buffers of the select node and
	it is safe to use them until we fetch from select again: therefore
	we can just copy the pointers */

	row = node->row;

	i = 0;
	list_node = node->select->select_list;

	while (list_node) {
		dfield = dtuple_get_nth_field(row, i);
		dfield_copy_data(dfield, que_node_get_val(list_node));

		i++;
		list_node = que_node_get_next(list_node);
	}
}

/***********************************************************//**
Inserts a row to a table.
@return DB_SUCCESS if operation successfully completed, else error
code or DB_LOCK_WAIT */
static MY_ATTRIBUTE((nonnull, warn_unused_result))
dberr_t
row_ins(
/*====*/
	ins_node_t*	node,	/*!< in: row insert node */
	que_thr_t*	thr)	/*!< in: query thread */
{
	DBUG_ENTER("row_ins");

	DBUG_PRINT("row_ins", ("table: %s", node->table->name.m_name));

	if (node->state == INS_NODE_ALLOC_ROW_ID) {
		node->index = dict_table_get_first_index(node->table);
		ut_ad(node->entry_list.empty() == false);
		node->entry = node->entry_list.begin();

		if (node->index->is_gen_clust()) {
			const uint64_t db_row_id{++node->table->row_id};
			if (db_row_id >> 48) {
				DBUG_RETURN(DB_OUT_OF_FILE_SPACE);
			}
			mach_write_to_6(node->sys_buf, db_row_id);
		}

		if (node->ins_type == INS_SEARCHED) {

			row_ins_get_row_from_select(node);

		} else if (node->ins_type == INS_VALUES) {

			row_ins_get_row_from_values(node);
		}

		node->state = INS_NODE_INSERT_ENTRIES;
	}

	ut_ad(node->state == INS_NODE_INSERT_ENTRIES);

	while (dict_index_t *index = node->index) {
		if (index->type & (DICT_FTS | DICT_CORRUPT)
		    || !index->is_committed()) {
		} else if (dberr_t err = row_ins_index_entry_step(node, thr)) {
			DBUG_RETURN(err);
		}
		node->index = dict_table_get_next_index(index);
		++node->entry;
	}

	ut_ad(node->entry == node->entry_list.end());

	node->state = INS_NODE_ALLOC_ROW_ID;

	DBUG_RETURN(DB_SUCCESS);
}

/***********************************************************//**
Inserts a row to a table. This is a high-level function used in SQL execution
graphs.
@return query thread to run next or NULL */
que_thr_t*
row_ins_step(
/*=========*/
	que_thr_t*	thr)	/*!< in: query thread */
{
	ins_node_t*	node;
	que_node_t*	parent;
	sel_node_t*	sel_node;
	trx_t*		trx;
	dberr_t		err;

	ut_ad(thr);

	DEBUG_SYNC_C("innodb_row_ins_step_enter");

	trx = thr_get_trx(thr);

	node = static_cast<ins_node_t*>(thr->run_node);

	ut_ad(que_node_get_type(node) == QUE_NODE_INSERT);

	parent = que_node_get_parent(node);
	sel_node = node->select;

	if (thr->prev_node == parent) {
		node->state = INS_NODE_SET_IX_LOCK;
	}

	/* If this is the first time this node is executed (or when
	execution resumes after wait for the table IX lock), set an
	IX lock on the table and reset the possible select node. MySQL's
	partitioned table code may also call an insert within the same
	SQL statement AFTER it has used this table handle to do a search.
	This happens, for example, when a row update moves it to another
	partition. In that case, we have already set the IX lock on the
	table during the search operation, and there is no need to set
	it again here. But we must write trx->id to node->sys_buf. */

	if (node->table->no_rollback()) {
		/* No-rollback tables should only be written to by a
		single thread at a time, but there can be multiple
		concurrent readers. We must hold an open table handle. */
		DBUG_ASSERT(node->table->get_ref_count() > 0);
		DBUG_ASSERT(node->ins_type == INS_DIRECT);
		/* No-rollback tables can consist only of a single index. */
		DBUG_ASSERT(node->entry_list.size() == 1);
		DBUG_ASSERT(UT_LIST_GET_LEN(node->table->indexes) == 1);
		/* There should be no possibility for interruption and
		restarting here. In theory, we could allow resumption
		from the INS_NODE_INSERT_ENTRIES state here. */
		DBUG_ASSERT(node->state == INS_NODE_SET_IX_LOCK);
		node->index = dict_table_get_first_index(node->table);
		node->entry = node->entry_list.begin();
		node->state = INS_NODE_INSERT_ENTRIES;
		goto do_insert;
	}

	if (node->state == INS_NODE_SET_IX_LOCK) {

		node->state = INS_NODE_ALLOC_ROW_ID;

		if (node->table->is_temporary()) {
			node->trx_id = trx->id;
		}

		/* It may be that the current session has not yet started
		its transaction, or it has been committed: */

		if (trx->id == node->trx_id) {
			/* No need to do IX-locking */

			goto same_trx;
		}

		err = lock_table(node->table, NULL, LOCK_IX, thr);

		DBUG_EXECUTE_IF("ib_row_ins_ix_lock_wait",
				err = DB_LOCK_WAIT;);

		if (err != DB_SUCCESS) {
			node->state = INS_NODE_SET_IX_LOCK;
			goto error_handling;
		}

		node->trx_id = trx->id;
same_trx:
		if (node->ins_type == INS_SEARCHED) {
			/* Reset the cursor */
			sel_node->state = SEL_NODE_OPEN;

			/* Fetch a row to insert */

			thr->run_node = sel_node;

			return(thr);
		}
	}

	if ((node->ins_type == INS_SEARCHED)
	    && (sel_node->state != SEL_NODE_FETCH)) {

		ut_ad(sel_node->state == SEL_NODE_NO_MORE_ROWS);

		/* No more rows to insert */
		thr->run_node = parent;

		return(thr);
	}
do_insert:
	/* DO THE CHECKS OF THE CONSISTENCY CONSTRAINTS HERE */

	err = row_ins(node, thr);

error_handling:
	trx->error_state = err;

	if (err != DB_SUCCESS) {
		/* err == DB_LOCK_WAIT or SQL error detected */
		return(NULL);
	}

	/* DO THE TRIGGER ACTIONS HERE */

	if (node->ins_type == INS_SEARCHED) {
		/* Fetch a row to insert */

		thr->run_node = sel_node;
	} else {
		thr->run_node = que_node_get_parent(node);
	}

	return(thr);
}
